Add support for watching packing plan to c++ state managers
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
index 45aff7c..f0edf6b 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
@@ -53,6 +53,9 @@
   path += "/pplans";
   FileUtils::makeDirectory(path);
   path = dpath;
+  path += "/packingplans";
+  FileUtils::makeDirectory(path);
+  path = dpath;
   path += "/executionstate";
   FileUtils::makeDirectory(path);
   path = dpath;
@@ -87,6 +90,19 @@
   CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
 }
 
+void HeronLocalFileStateMgr::SetPackingPlanWatch(const std::string& topology_name,
+                                                 VCallback<> watcher) {
+  CHECK(watcher);
+  // We kind of cheat here. We check periodically
+  time_t packingplan_last_change = FileUtils::getModifiedTime(GetPackingPlanPath(topology_name));
+
+  auto cb = [topology_name, packingplan_last_change, watcher, this](EventLoop::Status status) {
+    this->CheckPackingPlan(topology_name, packingplan_last_change, std::move(watcher), status);
+  };
+
+  CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
+}
+
 void HeronLocalFileStateMgr::GetTMasterLocation(const std::string& _topology_name,
                                                 proto::tmaster::TMasterLocation* _return,
                                                 VCallback<proto::system::StatusCode> cb) {
@@ -243,6 +259,31 @@
   CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
 }
 
+void HeronLocalFileStateMgr::CreatePackingPlan(const std::string& _topology_name,
+                                               const proto::system::PackingPlan& _packingPlan,
+                                               VCallback<proto::system::StatusCode> _cb) {
+  std::string fname = GetPackingPlanPath(_topology_name);
+  std::string contents;
+  _packingPlan.SerializeToString(&contents);
+
+  WriteToFile(fname, contents);
+}
+
+void HeronLocalFileStateMgr::GetPackingPlan(const std::string& _topology_name,
+                                             proto::system::PackingPlan* _return,
+                                             VCallback<proto::system::StatusCode> cb) {
+  std::string contents;
+  proto::system::StatusCode status =
+      ReadAllFileContents(GetPackingPlanPath(_topology_name), contents);
+  if (status == proto::system::OK) {
+    if (!_return->ParseFromString(contents)) {
+      status = proto::system::STATE_CORRUPTED;
+    }
+  }
+  auto wCb = [cb, status](EventLoop::Status) { cb(status); };
+  CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
+}
+
 void HeronLocalFileStateMgr::CreateExecutionState(const proto::system::ExecutionState& _st,
                                                   VCallback<proto::system::StatusCode> cb) {
   std::string fname = GetExecutionStatePath(_st.topology_name());
@@ -453,5 +494,21 @@
   CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
 }
 
+void HeronLocalFileStateMgr::CheckPackingPlan(std::string topology_name, time_t last_change,
+                                              VCallback<> watcher, EventLoop::Status) {
+  time_t nlast_change = FileUtils::getModifiedTime(GetPackingPlanPath(topology_name));
+  if (nlast_change > last_change) {
+    watcher();
+  } else {
+    nlast_change = last_change;
+  }
+
+  auto cb = [topology_name, nlast_change, watcher, this](EventLoop::Status status) {
+    this->CheckPackingPlan(topology_name, nlast_change, std::move(watcher), status);
+  };
+
+  CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, 1000000), 0);
+}
+
 }  // namespace common
 }  // namespace heron
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
index 92a4fd4..f146879 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
@@ -47,6 +47,7 @@
 
   void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
   void SetMetricsCacheLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
+  void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher);
 
   // implement the functions
   void GetTMasterLocation(const std::string& _topology_name,
@@ -75,6 +76,12 @@
   void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
                        VCallback<proto::system::StatusCode> _cb);
 
+  void CreatePackingPlan(const std::string& _topology_name,
+                         const proto::system::PackingPlan& _packingPlan,
+                         VCallback<proto::system::StatusCode> _cb);
+  void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+                      VCallback<proto::system::StatusCode> _cb);
+
   void CreateExecutionState(const proto::system::ExecutionState& _pplan,
                             VCallback<proto::system::StatusCode> _cb);
   void DeleteExecutionState(const std::string& _topology_name,
@@ -121,6 +128,8 @@
                             EventLoop::Status);
   void CheckMetricsCacheLocation(std::string _topology_name, time_t _last_change,
                                  VCallback<> _watcher, EventLoop::Status);
+  void CheckPackingPlan(std::string _topology_name, time_t _last_change,
+                        VCallback<> _watcher, EventLoop::Status);
 
   // Hold the EventLoop for scheduling callbacks
   EventLoop* eventLoop_;
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
index 140c0e0..e948e69 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
@@ -99,6 +99,8 @@
 
 std::string HeronStateMgr::GetPhysicalPlanDir() { return topleveldir_ + "/pplans"; }
 
+std::string HeronStateMgr::GetPackingPlanDir() { return topleveldir_ + "/packingplans"; }
+
 std::string HeronStateMgr::GetExecutionStateDir() { return topleveldir_ + "/executionstate"; }
 
 std::string HeronStateMgr::GetStatefulCheckpointsDir() {
@@ -120,6 +122,10 @@
   return GetPhysicalPlanDir() + "/" + _topname;
 }
 
+std::string HeronStateMgr::GetPackingPlanPath(const std::string& _topname) {
+  return GetPackingPlanDir() + "/" + _topname;
+}
+
 std::string HeronStateMgr::GetExecutionStatePath(const std::string& _topname) {
   return GetExecutionStateDir() + "/" + _topname;
 }
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
index 3f963db..0ed344d 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
@@ -76,6 +76,7 @@
   virtual void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher) = 0;
   virtual void SetMetricsCacheLocationWatch(
                const std::string& _topology_name, VCallback<> _watcher) = 0;
+  virtual void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher) = 0;
 
   // Sets/Gets the Tmaster
   virtual void GetTMasterLocation(const std::string& _topology_name,
@@ -110,6 +111,11 @@
                                proto::system::PhysicalPlan* _return,
                                VCallback<proto::system::StatusCode> _cb) = 0;
 
+  // Gets PackingPlan
+  virtual void GetPackingPlan(const std::string& _topology_name,
+                              proto::system::PackingPlan* _return,
+                              VCallback<proto::system::StatusCode> _cb) = 0;
+
   // Gets/Sets ExecutionState
   virtual void CreateExecutionState(const proto::system::ExecutionState& _st,
                                     VCallback<proto::system::StatusCode> _cb) = 0;
@@ -155,6 +161,7 @@
   std::string GetMetricsCacheLocationPath(const std::string& _topology_name);
   std::string GetTopologyPath(const std::string& _topology_name);
   std::string GetPhysicalPlanPath(const std::string& _topology_name);
+  std::string GetPackingPlanPath(const std::string& _topology_name);
   std::string GetExecutionStatePath(const std::string& _topology_name);
   std::string GetStatefulCheckpointsPath(const std::string& _topology_name);
 
@@ -162,6 +169,7 @@
   std::string GetMetricsCacheLocationDir();
   std::string GetTopologyDir();
   std::string GetPhysicalPlanDir();
+  std::string GetPackingPlanDir();
   std::string GetExecutionStateDir();
   std::string GetStatefulCheckpointsDir();
 
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
index f5bde10..71b5a0e 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
@@ -96,6 +96,14 @@
   SetMetricsCacheLocationWatchInternal();
 }
 
+void HeronZKStateMgr::SetPackingPlanWatch(const std::string& topology_name, VCallback<> watcher) {
+  CHECK(watcher);
+  CHECK(!topology_name.empty());
+
+  packing_plan_watcher_info_ = new TMasterLocationWatchInfo(std::move(watcher), topology_name);
+  SetPackingPlanWatchInternal();
+}
+
 void HeronZKStateMgr::SetTMasterLocation(const proto::tmaster::TMasterLocation& _location,
                                          VCallback<proto::system::StatusCode> cb) {
   // Just try to create an ephimeral node
@@ -221,6 +229,18 @@
   zkclient_->Get(path, contents, std::move(wCb));
 }
 
+void HeronZKStateMgr::GetPackingPlan(const std::string& _topology_name,
+                                      proto::system::PackingPlan* _return,
+                                      VCallback<proto::system::StatusCode> cb) {
+  std::string path = GetPackingPlanPath(_topology_name);
+  std::string* contents = new std::string();
+  auto wCb = [contents, _return, cb, this](sp_int32 rc) {
+    this->GetPackingPlanDone(contents, _return, std::move(cb), rc);
+  };
+
+  zkclient_->Get(path, contents, std::move(wCb));
+}
+
 void HeronZKStateMgr::CreateExecutionState(const proto::system::ExecutionState& _state,
                                            VCallback<proto::system::StatusCode> cb) {
   std::string path = GetExecutionStatePath(_state.topology_name());
@@ -535,6 +555,24 @@
   cb(code);
 }
 
+void HeronZKStateMgr::GetPackingPlanDone(std::string* _contents,
+                                          proto::system::PackingPlan* _return,
+                                          VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
+  proto::system::StatusCode code = proto::system::OK;
+  if (_rc == ZOK) {
+    if (!_return->ParseFromString(*_contents)) {
+      code = proto::system::STATE_CORRUPTED;
+    }
+  } else if (_rc == ZNONODE) {
+    code = proto::system::PATH_DOES_NOT_EXIST;
+  } else {
+    LOG(ERROR) << "Getting PackingPlan failed with error " << _rc << std::endl;
+    code = proto::system::STATE_READ_ERROR;
+  }
+  delete _contents;
+  cb(code);
+}
+
 void HeronZKStateMgr::CreateExecutionStateDone(VCallback<proto::system::StatusCode> cb,
                                                sp_int32 _rc) {
   proto::system::StatusCode code = proto::system::OK;
@@ -682,6 +720,12 @@
           !metricscache_location_watcher_info_->topology_name.empty());
 }
 
+bool HeronZKStateMgr::IsPackingPlanWatchDefined() {
+  return (packing_plan_watcher_info_ != NULL &&
+          packing_plan_watcher_info_->watcher_cb &&
+          !packing_plan_watcher_info_->topology_name.empty());
+}
+
 // 2 seconds
 const int HeronZKStateMgr::SET_WATCH_RETRY_INTERVAL_S = 2;
 
@@ -734,6 +778,25 @@
   }
 }
 
+void HeronZKStateMgr::SetPackingPlanWatchCompletionHandler(sp_int32 rc) {
+  if (rc == ZOK || rc == ZNONODE) {
+    // NoNode is when there is no packingplan up yet, but the watch is set.
+    LOG(INFO) << "Setting watch on packing plan succeeded: " << zerror(rc) << std::endl;
+  } else {
+    // Any other return code should be treated as warning, since ideally
+    // we shouldn't be in this state.
+    LOG(WARNING) << "Setting watch on packing plan returned: " << zerror(rc) << std::endl;
+
+    if (ShouldRetrySetWatch(rc)) {
+      LOG(INFO) << "Retrying after " << SET_WATCH_RETRY_INTERVAL_S << " seconds" << std::endl;
+
+      auto cb = [this](EventLoop::Status status) { this->CallSetPackingPlanWatch(status);};
+
+      eventLoop_->registerTimer(std::move(cb), false, SET_WATCH_RETRY_INTERVAL_S * 1000 * 1000);
+    }
+  }
+}
+
 void HeronZKStateMgr::CallSetTMasterLocationWatch(EventLoop::Status) {
   SetTMasterLocationWatchInternal();
 }
@@ -742,6 +805,10 @@
   SetMetricsCacheLocationWatchInternal();
 }
 
+void HeronZKStateMgr::CallSetPackingPlanWatch(EventLoop::Status) {
+  SetPackingPlanWatchInternal();
+}
+
 void HeronZKStateMgr::SetTMasterLocationWatchInternal() {
   CHECK(IsTmasterWatchDefined());
 
@@ -763,6 +830,16 @@
                     [this](sp_int32 rc) { this->SetMetricsCacheWatchCompletionHandler(rc); });
 }
 
+void HeronZKStateMgr::SetPackingPlanWatchInternal() {
+  CHECK(IsPackingPlanWatchDefined());
+
+  LOG(INFO) << "Setting watch on packing plan " << std::endl;
+  std::string path = GetPackingPlanPath(packing_plan_watcher_info_->topology_name);
+
+  zkclient_->Exists(path, [this]() { this->PackingPlanWatch(); },
+                    [this](sp_int32 rc) { this->SetPackingPlanWatchCompletionHandler(rc); });
+}
+
 void HeronZKStateMgr::TMasterLocationWatch() {
   // First setup watch again
   SetTMasterLocationWatchInternal();
@@ -776,5 +853,12 @@
   // Then run the watcher
   metricscache_location_watcher_info_->watcher_cb();
 }
+
+void HeronZKStateMgr::PackingPlanWatch() {
+  // First setup watch again
+  SetPackingPlanWatchInternal();
+  // Then run the watcher
+  packing_plan_watcher_info_->watcher_cb();
+}
 }  // namespace common
 }  // namespace heron
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
index b60afd1..35bf4ea 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
@@ -66,6 +66,7 @@
   // Sets up a watch on tmaster location change
   void SetTMasterLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
   void SetMetricsCacheLocationWatch(const std::string& _topology_name, VCallback<> _watcher);
+  void SetPackingPlanWatch(const std::string& _topology_name, VCallback<> _watcher);
 
   // Sets the Tmaster
   void SetTMasterLocation(const proto::tmaster::TMasterLocation& _location,
@@ -96,6 +97,9 @@
   void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
                        VCallback<proto::system::StatusCode> _cb);
 
+  void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+                      VCallback<proto::system::StatusCode> _cb);
+
   // Gets/Sets execution state
   void CreateExecutionState(const proto::system::ExecutionState& _state,
                             VCallback<proto::system::StatusCode> _cb);
@@ -154,6 +158,8 @@
   void SetPhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void GetPhysicalPlanDone(std::string* _contents, proto::system::PhysicalPlan* _return,
                            VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
+  void GetPackingPlanDone(std::string* _contents, proto::system::PackingPlan* _return,
+                          VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
 
   void CreateExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void DeleteExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
@@ -176,6 +182,7 @@
   // clients about the change.
   void TMasterLocationWatch();
   void MetricsCacheLocationWatch();
+  void PackingPlanWatch();
   // Handles global events from ZKClient. For now, it handles the session
   // expired event, by deleting the current client, creating a new one,
   // setting the tmaster location watch, and notifying the client of a
@@ -184,20 +191,24 @@
   // Sets a tmaster location watch through the ZKClient Exists method.
   void SetTMasterLocationWatchInternal();
   void SetMetricsCacheLocationWatchInternal();
+  void SetPackingPlanWatchInternal();
   // A wrapper to be passed to select server registerTimer call.
   // Ignores the status and call SetTMasterLocationWatchInternal
   void CallSetTMasterLocationWatch(EventLoop::Status status);
   void CallSetMetricsCacheLocationWatch(EventLoop::Status status);
+  void CallSetPackingPlanWatch(EventLoop::Status status);
   // A handler callback that gets called by ZkClient upon completion of
   // setting Tmaster watch. If the return code indicates failure, we
   // retry after SET_WATCH_RETRY_INTERVAL_S seconds.
   void SetTMasterWatchCompletionHandler(sp_int32 rc);
   void SetMetricsCacheWatchCompletionHandler(sp_int32 rc);
+  void SetPackingPlanWatchCompletionHandler(sp_int32 rc);
   // Essentially tells you whether SetTmasterLocationWatch has been
   // called by the client or not. It gets this info through
   // tmaster_location_watcher_info_
   bool IsTmasterWatchDefined();
   bool IsMetricsCacheWatchDefined();
+  bool IsPackingPlanWatchDefined();
   // Common functionality for c`tors. Should be called only once from c`tor
   void Init();
 
@@ -229,6 +240,7 @@
 
   const TMasterLocationWatchInfo* tmaster_location_watcher_info_;
   const TMasterLocationWatchInfo* metricscache_location_watcher_info_;
+  const TMasterLocationWatchInfo* packing_plan_watcher_info_;
   // If true, we exit on zookeeper session expired event
   const bool exitOnSessionExpiry_;
   // Retry interval if setting a watch on zk node fails.
diff --git a/heron/statemgrs/tests/cpp/statetest.cpp b/heron/statemgrs/tests/cpp/statetest.cpp
index 39b838a..4bab892 100644
--- a/heron/statemgrs/tests/cpp/statetest.cpp
+++ b/heron/statemgrs/tests/cpp/statetest.cpp
@@ -53,6 +53,7 @@
 
   HeronStateMgr* state_mgr = HeronStateMgr::MakeStateMgr(host_port, top_level_dir, &ss);
   state_mgr->SetTMasterLocationWatch(topology_name, []() { TMasterLocationWatchHandler(); });
+  state_mgr->SetPackingPlanWatch(topology_name, []() { PackingPlanWatchHandler(); });
   ss.loop();
   return 0;
 }
diff --git a/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp b/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp
index a9a9ea4..6f5042d 100644
--- a/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp
+++ b/heron/statemgrs/tests/cpp/zkstatemgr_unittest.cpp
@@ -80,6 +80,9 @@
   static void CallTMasterLocationWatch(HeronZKStateMgr* heron_zkstatemgr) {
     heron_zkstatemgr->TMasterLocationWatch();
   }
+  static void CallPackingPlanWatch(HeronZKStateMgr* heron_zkstatemgr) {
+    heron_zkstatemgr->PackingPlanWatch();
+  }
 
   // a proxy for the call since the tests cannot call directly
   // (friendship inheritance is not supported)
@@ -89,6 +92,7 @@
   }
 
   static void TmasterLocationWatchHandler() { tmaster_watch_handler_count++; }
+  static void PackingPlanWatchHandler() { packing_plan_watch_handler_count++; }
 
   MockZKClient* mock_zkclient;
   MockZKClientFactory* mock_zkclient_factory;
@@ -97,10 +101,12 @@
   std::string topleveldir;
   // used to verify the number of calls to TmasterLocationWatchHandler
   static int tmaster_watch_handler_count;
+  static int packing_plan_watch_handler_count;
 };
 
 // static member needs to be defined outside class... sigh :(
 int HeronZKStateMgrTest::tmaster_watch_handler_count = 0;
+int HeronZKStateMgrTest::packing_plan_watch_handler_count = 0;
 
 // Ensure that ZKClient is created and deleted appropriately.
 TEST_F(HeronZKStateMgrTest, testCreateDelete) {
@@ -162,6 +168,29 @@
   delete heron_zkstatemgr;
 }
 
+TEST_F(HeronZKStateMgrTest, testPackingPlanWatch) {
+  const std::string topology_name = "dummy_topology";
+  const std::string expected_path = topleveldir + "/packingplans/" + topology_name;
+
+  HeronZKStateMgr* heron_zkstatemgr =
+      new HeronZKStateMgrWithMock(hostportlist, topleveldir, &ss, mock_zkclient_factory);
+
+  heron_zkstatemgr->SetPackingPlanWatch(topology_name, []() { PackingPlanWatchHandler(); });
+
+  // ensure TmasterLocationWatch resets the watch
+  EXPECT_CALL(*mock_zkclient, Exists(expected_path, _, _)).Times(1);
+
+  packing_plan_watch_handler_count = 0;
+  CallPackingPlanWatch(heron_zkstatemgr);
+  // ensure watch handler is called.
+  ASSERT_EQ(packing_plan_watch_handler_count, 1);
+
+  EXPECT_CALL(*mock_zkclient, Die()).Times(1);
+  EXPECT_CALL(*mock_zkclient_factory, Die()).Times(1);
+
+  delete heron_zkstatemgr;
+}
+
 TEST_F(HeronZKStateMgrTest, testGlobalWatchEventHandler) {
   const std::string topology_name = "dummy_topology";
   const std::string expected_path = topleveldir + "/tmasters/" + topology_name;