Address review comments from Sanjeev
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
index 04e4e55..fa48ad0 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
@@ -124,6 +124,7 @@
}
sp_int32 StMgrClientMgr::FindBusiestTaskOnStmgr(const sp_string& _stmgr_id) {
+ CHECK(instance_stats_.find(_stmgr_id) != instance_stats_.end());
sp_int32 task_id;
sp_int64 max = 0;
for (auto iter = instance_stats_[_stmgr_id].begin();
@@ -162,6 +163,7 @@
void StMgrClientMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) {
// Call the StMgrServers removeBackPressure method
stream_manager_->StopBackPressureOnServer(_other_stmgr_id);
+ instance_stats_.clear();
}
void StMgrClientMgr::SendStartBackPressureToOtherStMgrs(const sp_int32 _task_id) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
index d4262de..005f1b6 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
@@ -77,7 +77,8 @@
sp_int64 high_watermark_;
sp_int64 low_watermark_;
- // Counters for remote instance traffic, this is used for back pressure
+ // Counters for the traffic per remote instance on per stmgr since the last back pressure,
+ // this is used by back pressure algorithm to decide which instance to blame
std::unordered_map<sp_string, std::unordered_map<sp_int32, sp_int64>> instance_stats_;
};
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 4a72b0b..4124eae 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -753,7 +753,7 @@
void StMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) {
// Call the StMgrServers removeBackPressure method
// Note: This is not good, we probably do not want to unthrottle them all
- // at once, the upper layer only calls this once.
+ // at once, but the lower layer only calls us once.
while (!backpressure_starters_.empty()) {
server_->StopBackPressureClientCb(_other_stmgr_id, backpressure_starters_.front());
backpressure_starters_.pop();