Add support for watching packing plan to c++ state managers
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
index 45aff7c..f0edf6b 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
@@ -53,6 +53,9 @@
path += "/pplans";
FileUtils::makeDirectory(path);
path = dpath;
+ path += "/packingplans";
+ FileUtils::makeDirectory(path);
+ path = dpath;
path += "/executionstate";
FileUtils::makeDirectory(path);
path = dpath;
@@ -87,6 +90,19 @@
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
+void HeronLocalFileStateMgr::SetPackingPlanWatch(const std::string& topology_name,
+ VCallback<> watcher) {
+ CHECK(watcher);
+ // We kind of cheat here. We check periodically
+ time_t packingplan_last_change = FileUtils::getModifiedTime(GetPackingPlanPath(topology_name));
+
+ auto cb = [topology_name, packingplan_last_change, watcher, this](EventLoop::Status status) {
+ this->CheckPackingPlan(topology_name, packingplan_last_change, std::move(watcher), status);
+ };
+
+ CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
+}
+
void HeronLocalFileStateMgr::GetTMasterLocation(const std::string& _topology_name,
proto::tmaster::TMasterLocation* _return,
VCallback<proto::system::StatusCode> cb) {
@@ -243,6 +259,31 @@
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
}
+void HeronLocalFileStateMgr::CreatePackingPlan(const std::string& _topology_name,
+ const proto::system::PackingPlan& _packingPlan,
+ VCallback<proto::system::StatusCode> _cb) {
+ std::string fname = GetPackingPlanPath(_topology_name);
+ std::string contents;
+ _packingPlan.SerializeToString(&contents);
+
+ WriteToFile(fname, contents);
+}
+
+void HeronLocalFileStateMgr::GetPackingPlan(const std::string& _topology_name,
+ proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> cb) {
+ std::string contents;
+ proto::system::StatusCode status =
+ ReadAllFileContents(GetPackingPlanPath(_topology_name), contents);
+ if (status == proto::system::OK) {
+ if (!_return->ParseFromString(contents)) {
+ status = proto::system::STATE_CORRUPTED;
+ }
+ }
+ auto wCb = [cb, status](EventLoop::Status) { cb(status); };
+ CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
+}
+
void HeronLocalFileStateMgr::CreateExecutionState(const proto::system::ExecutionState& _st,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetExecutionStatePath(_st.topology_name());
@@ -453,5 +494,21 @@
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
}
+void HeronLocalFileStateMgr::CheckPackingPlan(std::string topology_name, time_t last_change,
+ VCallback<> watcher, EventLoop::Status) {
+ time_t nlast_change = FileUtils::getModifiedTime(GetPackingPlanPath(topology_name));
+ if (nlast_change > last_change) {
+ watcher();
+ } else {
+ nlast_change = last_change;
+ }
+
+ auto cb = [topology_name, nlast_change, watcher, this](EventLoop::Status status) {
+ this->CheckPackingPlan(topology_name, nlast_change, std::move(watcher), status);
+ };
+
+ CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
+}
+
} // namespace common
} // namespace heron
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
index 92a4fd4..f146879 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
@@ -47,6 +47,7 @@
void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
void SetMetricsCacheLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
+ void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher);
// implement the functions
void GetTMasterLocation(const std::string& _topology_name,
@@ -75,6 +76,12 @@
void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> _cb);
+ void CreatePackingPlan(const std::string& _topology_name,
+ const proto::system::PackingPlan& _packingPlan,
+ VCallback<proto::system::StatusCode> _cb);
+ void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> _cb);
+
void CreateExecutionState(const proto::system::ExecutionState& _pplan,
VCallback<proto::system::StatusCode> _cb);
void DeleteExecutionState(const std::string& _topology_name,
@@ -121,6 +128,8 @@
EventLoop::Status);
void CheckMetricsCacheLocation(std::string _topology_name, time_t _last_change,
VCallback<> _watcher, EventLoop::Status);
+ void CheckPackingPlan(std::string _topology_name, time_t _last_change,
+ VCallback<> _watcher, EventLoop::Status);
// Hold the EventLoop for scheduling callbacks
EventLoop* eventLoop_;
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
index 140c0e0..e948e69 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
@@ -99,6 +99,8 @@
std::string HeronStateMgr::GetPhysicalPlanDir() { return topleveldir_ + "/pplans"; }
+std::string HeronStateMgr::GetPackingPlanDir() { return topleveldir_ + "/packingplans"; }
+
std::string HeronStateMgr::GetExecutionStateDir() { return topleveldir_ + "/executionstate"; }
std::string HeronStateMgr::GetStatefulCheckpointsDir() {
@@ -120,6 +122,10 @@
return GetPhysicalPlanDir() + "/" + _topname;
}
+std::string HeronStateMgr::GetPackingPlanPath(const std::string& _topname) {
+ return GetPackingPlanDir() + "/" + _topname;
+}
+
std::string HeronStateMgr::GetExecutionStatePath(const std::string& _topname) {
return GetExecutionStateDir() + "/" + _topname;
}
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
index 3f963db..0ed344d 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
@@ -76,6 +76,7 @@
virtual void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher) = 0;
virtual void SetMetricsCacheLocationWatch(
const std::string& _topology_name, VCallback<> _watcher) = 0;
+ virtual void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher) = 0;
// Sets/Gets the Tmaster
virtual void GetTMasterLocation(const std::string& _topology_name,
@@ -110,6 +111,11 @@
proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> _cb) = 0;
+ // Gets PackingPlan
+ virtual void GetPackingPlan(const std::string& _topology_name,
+ proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> _cb) = 0;
+
// Gets/Sets ExecutionState
virtual void CreateExecutionState(const proto::system::ExecutionState& _st,
VCallback<proto::system::StatusCode> _cb) = 0;
@@ -155,6 +161,7 @@
std::string GetMetricsCacheLocationPath(const std::string& _topology_name);
std::string GetTopologyPath(const std::string& _topology_name);
std::string GetPhysicalPlanPath(const std::string& _topology_name);
+ std::string GetPackingPlanPath(const std::string& _topology_name);
std::string GetExecutionStatePath(const std::string& _topology_name);
std::string GetStatefulCheckpointsPath(const std::string& _topology_name);
@@ -162,6 +169,7 @@
std::string GetMetricsCacheLocationDir();
std::string GetTopologyDir();
std::string GetPhysicalPlanDir();
+ std::string GetPackingPlanDir();
std::string GetExecutionStateDir();
std::string GetStatefulCheckpointsDir();
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
index f5bde10..71b5a0e 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
@@ -96,6 +96,14 @@
SetMetricsCacheLocationWatchInternal();
}
+void HeronZKStateMgr::SetPackingPlanWatch(const std::string& topology_name, VCallback<> watcher) {
+ CHECK(watcher);
+ CHECK(!topology_name.empty());
+
+ packing_plan_watcher_info_ = new TMasterLocationWatchInfo(std::move(watcher), topology_name);
+ SetPackingPlanWatchInternal();
+}
+
void HeronZKStateMgr::SetTMasterLocation(const proto::tmaster::TMasterLocation& _location,
VCallback<proto::system::StatusCode> cb) {
// Just try to create an ephimeral node
@@ -221,6 +229,18 @@
zkclient_->Get(path, contents, std::move(wCb));
}
+void HeronZKStateMgr::GetPackingPlan(const std::string& _topology_name,
+ proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> cb) {
+ std::string path = GetPackingPlanPath(_topology_name);
+ std::string* contents = new std::string();
+ auto wCb = [contents, _return, cb, this](sp_int32 rc) {
+ this->GetPackingPlanDone(contents, _return, std::move(cb), rc);
+ };
+
+ zkclient_->Get(path, contents, std::move(wCb));
+}
+
void HeronZKStateMgr::CreateExecutionState(const proto::system::ExecutionState& _state,
VCallback<proto::system::StatusCode> cb) {
std::string path = GetExecutionStatePath(_state.topology_name());
@@ -535,6 +555,24 @@
cb(code);
}
+void HeronZKStateMgr::GetPackingPlanDone(std::string* _contents,
+ proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
+ proto::system::StatusCode code = proto::system::OK;
+ if (_rc == ZOK) {
+ if (!_return->ParseFromString(*_contents)) {
+ code = proto::system::STATE_CORRUPTED;
+ }
+ } else if (_rc == ZNONODE) {
+ code = proto::system::PATH_DOES_NOT_EXIST;
+ } else {
+ LOG(ERROR) << "Getting PackingPlan failed with error " << _rc << std::endl;
+ code = proto::system::STATE_READ_ERROR;
+ }
+ delete _contents;
+ cb(code);
+}
+
void HeronZKStateMgr::CreateExecutionStateDone(VCallback<proto::system::StatusCode> cb,
sp_int32 _rc) {
proto::system::StatusCode code = proto::system::OK;
@@ -682,6 +720,12 @@
!metricscache_location_watcher_info_->topology_name.empty());
}
+bool HeronZKStateMgr::IsPackingPlanWatchDefined() {
+ return (packing_plan_watcher_info_ != NULL &&
+ packing_plan_watcher_info_->watcher_cb &&
+ !packing_plan_watcher_info_->topology_name.empty());
+}
+
// 2 seconds
const int HeronZKStateMgr::SET_WATCH_RETRY_INTERVAL_S = 2;
@@ -734,6 +778,25 @@
}
}
+void HeronZKStateMgr::SetPackingPlanWatchCompletionHandler(sp_int32 rc) {
+ if (rc == ZOK || rc == ZNONODE) {
+ // NoNode is when there is no packingplan up yet, but the watch is set.
+ LOG(INFO) << "Setting watch on packing plan succeeded: " << zerror(rc) << std::endl;
+ } else {
+ // Any other return code should be treated as warning, since ideally
+ // we shouldn't be in this state.
+ LOG(WARNING) << "Setting watch on packing plan returned: " << zerror(rc) << std::endl;
+
+ if (ShouldRetrySetWatch(rc)) {
+ LOG(INFO) << "Retrying after " << SET_WATCH_RETRY_INTERVAL_S << " seconds" << std::endl;
+
+ auto cb = [this](EventLoop::Status status) { this->CallSetPackingPlanWatch(status);};
+
+ eventLoop_->registerTimer(std::move(cb), false, SET_WATCH_RETRY_INTERVAL_S * 1000 * 1000);
+ }
+ }
+}
+
void HeronZKStateMgr::CallSetTMasterLocationWatch(EventLoop::Status) {
SetTMasterLocationWatchInternal();
}
@@ -742,6 +805,10 @@
SetMetricsCacheLocationWatchInternal();
}
+void HeronZKStateMgr::CallSetPackingPlanWatch(EventLoop::Status) {
+ SetPackingPlanWatchInternal();
+}
+
void HeronZKStateMgr::SetTMasterLocationWatchInternal() {
CHECK(IsTmasterWatchDefined());
@@ -763,6 +830,16 @@
[this](sp_int32 rc) { this->SetMetricsCacheWatchCompletionHandler(rc); });
}
+void HeronZKStateMgr::SetPackingPlanWatchInternal() {
+ CHECK(IsPackingPlanWatchDefined());
+
+ LOG(INFO) << "Setting watch on packing plan " << std::endl;
+ std::string path = GetPackingPlanPath(packing_plan_watcher_info_->topology_name);
+
+ zkclient_->Exists(path, [this]() { this->PackingPlanWatch(); },
+ [this](sp_int32 rc) { this->SetPackingPlanWatchCompletionHandler(rc); });
+}
+
void HeronZKStateMgr::TMasterLocationWatch() {
// First setup watch again
SetTMasterLocationWatchInternal();
@@ -776,5 +853,12 @@
// Then run the watcher
metricscache_location_watcher_info_->watcher_cb();
}
+
+void HeronZKStateMgr::PackingPlanWatch() {
+ // First setup watch again
+ SetPackingPlanWatchInternal();
+ // Then run the watcher
+ packing_plan_watcher_info_->watcher_cb();
+}
} // namespace common
} // namespace heron
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
index b60afd1..35bf4ea 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
@@ -66,6 +66,7 @@
// Sets up a watch on tmaster location change
void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
void SetMetricsCacheLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
+ void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher);
// Sets the Tmaster
void SetTMasterLocation(const proto::tmaster::TMasterLocation& _location,
@@ -96,6 +97,9 @@
void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> _cb);
+ void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> _cb);
+
// Gets/Sets execution state
void CreateExecutionState(const proto::system::ExecutionState& _state,
VCallback<proto::system::StatusCode> _cb);
@@ -154,6 +158,8 @@
void SetPhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetPhysicalPlanDone(std::string* _contents, proto::system::PhysicalPlan* _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
+ void GetPackingPlanDone(std::string* _contents, proto::system::PackingPlan* _return,
+ VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void CreateExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeleteExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
@@ -176,6 +182,7 @@
// clients about the change.
void TMasterLocationWatch();
void MetricsCacheLocationWatch();
+ void PackingPlanWatch();
// Handles global events from ZKClient. For now, it handles the session
// expired event, by deleting the current client, creating a new one,
// setting the tmaster location watch, and notifying the client of a
@@ -184,20 +191,24 @@
// Sets a tmaster location watch through the ZKClient Exists method.
void SetTMasterLocationWatchInternal();
void SetMetricsCacheLocationWatchInternal();
+ void SetPackingPlanWatchInternal();
// A wrapper to be passed to select server registerTimer call.
// Ignores the status and call SetTMasterLocationWatchInternal
void CallSetTMasterLocationWatch(EventLoop::Status status);
void CallSetMetricsCacheLocationWatch(EventLoop::Status status);
+ void CallSetPackingPlanWatch(EventLoop::Status status);
// A handler callback that gets called by ZkClient upon completion of
// setting Tmaster watch. If the return code indicates failure, we
// retry after SET_WATCH_RETRY_INTERVAL_S seconds.
void SetTMasterWatchCompletionHandler(sp_int32 rc);
void SetMetricsCacheWatchCompletionHandler(sp_int32 rc);
+ void SetPackingPlanWatchCompletionHandler(sp_int32 rc);
// Essentially tells you whether SetTmasterLocationWatch has been
// called by the client or not. It gets this info through
// tmaster_location_watcher_info_
bool IsTmasterWatchDefined();
bool IsMetricsCacheWatchDefined();
+ bool IsPackingPlanWatchDefined();
// Common functionality for c`tors. Should be called only once from c`tor
void Init();
@@ -229,6 +240,7 @@
const TMasterLocationWatchInfo* tmaster_location_watcher_info_;
const TMasterLocationWatchInfo* metricscache_location_watcher_info_;
+ const TMasterLocationWatchInfo* packing_plan_watcher_info_;
// If true, we exit on zookeeper session expired event
const bool exitOnSessionExpiry_;
// Retry interval if setting a watch on zk node fails.
diff --git a/heron/statemgrs/tests/cpp/statetest.cpp b/heron/statemgrs/tests/cpp/statetest.cpp
index 39b838a..4bab892 100644
--- a/heron/statemgrs/tests/cpp/statetest.cpp
+++ b/heron/statemgrs/tests/cpp/statetest.cpp
@@ -53,6 +53,7 @@
HeronStateMgr* state_mgr = HeronStateMgr::MakeStateMgr(host_port, top_level_dir, &ss);
state_mgr->SetTMasterLocationWatch(topology_name, []() { TMasterLocationWatchHandler(); });
+ state_mgr->SetPackingPlanWatch(topology_name, []() { PackingPlanWatchHandler(); });
ss.loop();
return 0;
}
diff --git a/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp b/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp
index a9a9ea4..6f5042d 100644
--- a/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp
+++ b/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp
@@ -80,6 +80,9 @@
static void CallTMasterLocationWatch(HeronZKStateMgr* heron_zkstatemgr) {
heron_zkstatemgr->TMasterLocationWatch();
}
+ static void CallPackingPlanWatch(HeronZKStateMgr* heron_zkstatemgr) {
+ heron_zkstatemgr->PackingPlanWatch();
+ }
// a proxy for the call since the tests cannot call directly
// (friendship inheritance is not supported)
@@ -89,6 +92,7 @@
}
static void TmasterLocationWatchHandler() { tmaster_watch_handler_count++; }
+ static void PackingPlanWatchHandler() { packing_plan_watch_handler_count++; }
MockZKClient* mock_zkclient;
MockZKClientFactory* mock_zkclient_factory;
@@ -97,10 +101,12 @@
std::string topleveldir;
// used to verify the number of calls to TmasterLocationWatchHandler
static int tmaster_watch_handler_count;
+ static int packing_plan_watch_handler_count;
};
// static member needs to be defined outside class... sigh :(
int HeronZKStateMgrTest::tmaster_watch_handler_count = 0;
+int HeronZKStateMgrTest::packing_plan_watch_handler_count = 0;
// Ensure that ZKClient is created and deleted appropriately.
TEST_F(HeronZKStateMgrTest, testCreateDelete) {
@@ -162,6 +168,29 @@
delete heron_zkstatemgr;
}
+TEST_F(HeronZKStateMgrTest, testPackingPlanWatch) {
+ const std::string topology_name = "dummy_topology";
+ const std::string expected_path = topleveldir + "/packingplans/" + topology_name;
+
+ HeronZKStateMgr* heron_zkstatemgr =
+ new HeronZKStateMgrWithMock(hostportlist, topleveldir, &ss, mock_zkclient_factory);
+
+ heron_zkstatemgr->SetPackingPlanWatch(topology_name, []() { PackingPlanWatchHandler(); });
+
+ // ensure TmasterLocationWatch resets the watch
+ EXPECT_CALL(*mock_zkclient, Exists(expected_path, _, _)).Times(1);
+
+ packing_plan_watch_handler_count = 0;
+ CallPackingPlanWatch(heron_zkstatemgr);
+ // ensure watch handler is called.
+ ASSERT_EQ(packing_plan_watch_handler_count, 1);
+
+ EXPECT_CALL(*mock_zkclient, Die()).Times(1);
+ EXPECT_CALL(*mock_zkclient_factory, Die()).Times(1);
+
+ delete heron_zkstatemgr;
+}
+
TEST_F(HeronZKStateMgrTest, testGlobalWatchEventHandler) {
const std::string topology_name = "dummy_topology";
const std::string expected_path = topleveldir + "/tmasters/" + topology_name;