Add metrics for tracking bp caused by local instances and remote stmgrs (#3362)
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 9a752de..047c634 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -67,13 +67,16 @@
// Num bytes lost since instances is not connected
const sp_string METRIC_BYTES_TO_INSTANCES_LOST = "__bytes_to_workers_lost";
+// Time spent in back pressure caused by instances managed by this stmgr.
+const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE =
+ "__time_spent_back_pressure_by_local_instance";
// Time spent in back pressure aggregated - back pressure initiated by us +
// others
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_AGGR = "__server/__time_spent_back_pressure_aggr";
// Time spent in back pressure because of a component id. The comp id will be
-// appended
-// to the string below
+// appended to the string below
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_COMPID = "__time_spent_back_pressure_by_compid/";
+
// Prefix for connection buffer's metrics
const sp_string CONNECTION_BUFFER_BY_INSTANCEID = "__connection_buffer_by_instanceid/";
// Prefix for connection buffer's length metrics. This is different
@@ -117,6 +120,9 @@
metrics_manager_client_->register_metric("__server", instance_server_metrics_);
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR,
back_pressure_metric_aggr_);
+ back_pressure_metric_caused_by_local_instances_ = make_shared<heron::common::TimeSpentMetric>();
+ metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE,
+ back_pressure_metric_caused_by_local_instances_);
spouts_under_back_pressure_ = false;
// Update queue related metrics every 10 seconds
@@ -180,6 +186,8 @@
metrics_manager_client_->unregister_metric("__server");
metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR);
+ metrics_manager_client_->unregister_metric(
+ METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE);
// cleanup the instance info
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
@@ -549,6 +557,8 @@
return "";
}
+// This function is called when the buffer in the connection is full (the instance is not consuming
+// tuples fast enough).
void InstanceServer::StartBackPressureConnectionCb(Connection* _connection) {
// The connection will notify us when we can stop the back pressure
_connection->setCausedBackPressure();
@@ -568,6 +578,8 @@
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStartBackPressureToOtherStMgrs();
+ // Start backpressure from local instances metric
+ back_pressure_metric_caused_by_local_instances_->Start();
}
// Indicate which instance component had back pressure
@@ -578,6 +590,7 @@
StartBackPressureOnSpouts();
}
+// This function is called when the buffer in the connection is empty (the tuples are drained).
void InstanceServer::StopBackPressureConnectionCb(Connection* _connection) {
_connection->unsetCausedBackPressure();
@@ -605,6 +618,8 @@
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStopBackPressureToOtherStMgrs();
+ // Stop backpressure from local instances metric
+ back_pressure_metric_caused_by_local_instances_->Stop();
}
AttemptStopBackPressureFromSpouts();
}
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index b4a6038..e3a1b82 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -31,11 +31,11 @@
namespace heron {
namespace common {
+class AssignableMetric;
class MetricsMgrSt;
class MultiCountMetric;
-class TimeSpentMetric;
-class AssignableMetric;
class MultiMeanMetric;
+class TimeSpentMetric;
}
}
@@ -196,6 +196,7 @@
shared_ptr<heron::common::MetricsMgrSt> metrics_manager_client_;
shared_ptr<heron::common::MultiCountMetric> instance_server_metrics_;
shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_aggr_;
+ shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_caused_by_local_instances_;
bool spouts_under_back_pressure_;
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 12e573e..f16afca 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -47,6 +47,10 @@
const sp_string METRIC_FAIL_TUPLES_FROM_STMGRS = "__fail_tuples_from_stmgrs";
// Bytes received from other stream managers
const sp_string METRIC_BYTES_FROM_STMGRS = "__bytes_from_stmgrs";
+// Time spent in back pressure caused by remote stream managers.
+const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR =
+ "__time_spent_back_pressure_by_remote_stmgr";
+
StMgrServer::StMgrServer(shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
@@ -80,6 +84,10 @@
bytes_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
bytes_from_stmgrs_metrics_);
+ back_pressure_metric_caused_by_remote_stmgr_ = make_shared<heron::common::TimeSpentMetric>();
+ metrics_manager_client_->register_metric(
+ SERVER_SCOPE + METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR,
+ back_pressure_metric_caused_by_remote_stmgr_);
}
StMgrServer::~StMgrServer() {
@@ -88,6 +96,8 @@
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
+ metrics_manager_client_->unregister_metric(
+ SERVER_SCOPE + METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR);
}
void StMgrServer::HandleNewConnection(Connection* _conn) {
@@ -180,10 +190,12 @@
void StMgrServer::StartBackPressureClientCb(const sp_string& _other_stmgr_id) {
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStartBackPressureToOtherStMgrs();
+ // Start backpressure from remote stmgr metric
+ back_pressure_metric_caused_by_remote_stmgr_->Start();
}
remote_ends_who_caused_back_pressure_.insert(_other_stmgr_id);
- LOG(INFO) << "We observe back pressure on sending data to remote stream manager "
- << _other_stmgr_id;
+ LOG(WARNING) << "We observe back pressure on sending data to remote stream manager "
+ << _other_stmgr_id;
stmgr_->StartBackPressureOnSpouts();
}
@@ -194,9 +206,11 @@
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStopBackPressureToOtherStMgrs();
+ // Stop backpressure from remote stmgr metric
+ back_pressure_metric_caused_by_remote_stmgr_->Stop();
}
- LOG(INFO) << "We don't observe back pressure now on sending data to remote "
- "stream manager "
+ LOG(WARNING) << "We don't observe back pressure now on sending data to remote "
+ "stream manager "
<< _other_stmgr_id;
if (!stmgr_->DidAnnounceBackPressure() && !stmgr_->DidOthersAnnounceBackPressure()) {
stmgr_->AttemptStopBackPressureFromSpouts();
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 325f0c1..d0baf69 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -31,8 +31,9 @@
namespace heron {
namespace common {
-class MetricsMgrSt;
class CountMetric;
+class MetricsMgrSt;
+class TimeSpentMetric;
}
}
@@ -107,6 +108,7 @@
shared_ptr<heron::common::CountMetric> ack_tuples_from_stmgrs_metrics_;
shared_ptr<heron::common::CountMetric> fail_tuples_from_stmgrs_metrics_;
shared_ptr<heron::common::CountMetric> bytes_from_stmgrs_metrics_;
+ shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_caused_by_remote_stmgr_;
};
} // namespace stmgr