STREAMCOMP-2724: TMaster migrated to mostly use smart pointers instead of manual memory management (#3248)

diff --git a/heron/common/src/cpp/basics/basics.h b/heron/common/src/cpp/basics/basics.h
index bc00b00..766a64f 100644
--- a/heron/common/src/cpp/basics/basics.h
+++ b/heron/common/src/cpp/basics/basics.h
@@ -46,4 +46,13 @@
 #include "basics/sptest.h"
 #include "basics/mempool.h"
 
+// The standard std::make_unique(...) was introduced starting from C++ 14. Heron uses C++ 11.
+// Unfortunatelly we can not bump a compiler version used in Heron since we are tight coupled with
+// CentOS 7 whih comes with very old version of GCC (4.8.5). That's why we introduced
+// make_unique(...) manually here.
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args) {
+    return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+
 #endif  // HERON_BASICS_H_
diff --git a/heron/common/src/cpp/metrics/metrics-mgr-st.cpp b/heron/common/src/cpp/metrics/metrics-mgr-st.cpp
index e79c21f..5da37f1 100644
--- a/heron/common/src/cpp/metrics/metrics-mgr-st.cpp
+++ b/heron/common/src/cpp/metrics/metrics-mgr-st.cpp
@@ -36,6 +36,8 @@
 namespace heron {
 namespace common {
 
+using std::shared_ptr;
+
 MetricsMgrSt::MetricsMgrSt(sp_int32 _metricsmgr_port, sp_int32 _interval, EventLoop* eventLoop) {
   options_.set_host("127.0.0.1");
   options_.set_port(_metricsmgr_port);
@@ -52,9 +54,6 @@
 MetricsMgrSt::~MetricsMgrSt() {
   CHECK_EQ(client_->getEventLoop()->unRegisterTimer(timerid_), 0);
   delete client_;
-  for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
-    delete iter->second;
-  }
 }
 
 void MetricsMgrSt::Start(const sp_string& _my_hostname, sp_int32 _my_port,
@@ -74,7 +73,7 @@
   client_->SendMetricsCacheLocation(location);
 }
 
-void MetricsMgrSt::register_metric(const sp_string& _metric_name, IMetric* _metric) {
+void MetricsMgrSt::register_metric(const sp_string& _metric_name, shared_ptr<IMetric> _metric) {
   metrics_[_metric_name] = _metric;
 }
 
diff --git a/heron/common/src/cpp/metrics/metrics-mgr-st.h b/heron/common/src/cpp/metrics/metrics-mgr-st.h
index 4cec4b7..ef15710 100644
--- a/heron/common/src/cpp/metrics/metrics-mgr-st.h
+++ b/heron/common/src/cpp/metrics/metrics-mgr-st.h
@@ -44,6 +44,8 @@
 namespace heron {
 namespace common {
 
+using std::shared_ptr;
+
 class MetricsMgrClient;
 class IMetric;
 
@@ -52,7 +54,7 @@
   MetricsMgrSt(sp_int32 _metricsmgr_port, sp_int32 _interval, EventLoop* eventLoop);
   virtual ~MetricsMgrSt();
 
-  void register_metric(const sp_string& _metric_name, IMetric* _metric);
+  void register_metric(const sp_string& _metric_name, shared_ptr<IMetric> _metric);
   void unregister_metric(const sp_string& _metric_name);
   void RefreshTMasterLocation(const proto::tmaster::TMasterLocation& location);
   void RefreshMetricsCacheLocation(const proto::tmaster::MetricsCacheLocation& location);
@@ -72,7 +74,7 @@
   void gather_metrics(EventLoop::Status);
 
   VCallback<EventLoop::Status> timer_cb_;
-  std::map<sp_string, IMetric*> metrics_;
+  std::map<sp_string, shared_ptr<IMetric>> metrics_;
   MetricsMgrClient* client_;
   NetworkOptions options_;
   sp_int64 timerid_;
diff --git a/heron/common/src/cpp/metrics/metricsmgr-client.cpp b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
index 59e06b2..998c607 100644
--- a/heron/common/src/cpp/metrics/metricsmgr-client.cpp
+++ b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
@@ -43,7 +43,7 @@
       tmaster_location_(NULL),
       metricscache_location_(NULL),
       registered_(false) {
-  InstallResponseHandler(new proto::system::MetricPublisherRegisterRequest(),
+  InstallResponseHandler(make_unique<proto::system::MetricPublisherRegisterRequest>(),
                          &MetricsMgrClient::HandleRegisterResponse);
   Start();
 }
@@ -69,8 +69,7 @@
 void MetricsMgrClient::ReConnect() { Start(); }
 
 void MetricsMgrClient::SendRegisterRequest() {
-  proto::system::MetricPublisherRegisterRequest* request;
-  request = new proto::system::MetricPublisherRegisterRequest();
+  auto request = make_unique<proto::system::MetricPublisherRegisterRequest>();
 
   proto::system::MetricPublisher* publisher = request->mutable_publisher();
   publisher->set_hostname(hostname_);
@@ -79,7 +78,7 @@
   publisher->set_instance_id(instance_id_);
   publisher->set_instance_index(instance_index_);
 
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
 }
 
 void MetricsMgrClient::HandleRegisterResponse(
diff --git a/heron/common/src/cpp/network/client.cpp b/heron/common/src/cpp/network/client.cpp
index 1c075f8..c9f88aa 100644
--- a/heron/common/src/cpp/network/client.cpp
+++ b/heron/common/src/cpp/network/client.cpp
@@ -37,12 +37,13 @@
 
 void Client::Stop() { Stop_Base(); }
 
-void Client::SendRequest(google::protobuf::Message* _request, void* _ctx) {
-  SendRequest(_request, _ctx, -1);
+void Client::SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx) {
+  SendRequest(std::move(_request), _ctx, -1);
 }
 
-void Client::SendRequest(google::protobuf::Message* _request, void* _ctx, sp_int64 _msecs) {
-  InternalSendRequest(_request, _ctx, _msecs);
+void Client::SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx,
+        sp_int64 _msecs) {
+  InternalSendRequest(std::move(_request), _ctx, _msecs);
 }
 
 void Client::SendResponse(REQID _id, const google::protobuf::Message& _response) {
@@ -93,12 +94,12 @@
 
 void Client::Init() { message_rid_gen_ = new REQID_Generator(); }
 
-void Client::InternalSendRequest(google::protobuf::Message* _request, void* _ctx, sp_int64 _msecs) {
+void Client::InternalSendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx,
+        sp_int64 _msecs) {
   auto iter = requestResponseMap_.find(_request->GetTypeName());
   CHECK(iter != requestResponseMap_.end());
   const sp_string& _expected_response_type = iter->second;
   if (state_ != CONNECTED) {
-    delete _request;
     responseHandlers[_expected_response_type](NULL, WRITE_ERROR);
     return;
   }
@@ -118,9 +119,6 @@
   CHECK_EQ(opkt->PackREQID(rid), 0);
   CHECK_EQ(opkt->PackProtocolBuffer(*_request, byte_size), 0);
 
-  // delete the request
-  delete _request;
-
   Connection* conn = static_cast<Connection*>(conn_);
   if (conn->sendPacket(opkt) != 0) {
     context_map_.erase(rid);
diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h
index 34f4583..593751b 100644
--- a/heron/common/src/cpp/network/client.h
+++ b/heron/common/src/cpp/network/client.h
@@ -104,10 +104,10 @@
   // a user owned piece of context that is not interpreted by the
   // client which is passed on to the HandleResponse
   // A negative value of the msecs means no timeout.
-  void SendRequest(google::protobuf::Message* _request, void* ctx, sp_int64 msecs);
+  void SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* ctx, sp_int64 msecs);
 
   // Convinience method of the above function with no timeout
-  void SendRequest(google::protobuf::Message* _request, void* ctx);
+  void SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* ctx);
 
   // This interface is used if you want to communicate with the other end
   // on a non-request-response based communication.
@@ -129,15 +129,13 @@
 
   // Register a handler for a particular response type
   template <typename S, typename T, typename M>
-  void InstallResponseHandler(S* _request,
+  void InstallResponseHandler(std::unique_ptr<S> _request,
                               void (T::*method)(void* _ctx, M*, NetworkErrorCode status)) {
-    google::protobuf::Message* m = new M();
+    auto m = make_unique<M>();
     T* t = static_cast<T*>(this);
     responseHandlers[m->GetTypeName()] = std::bind(&Client::dispatchResponse<T, M>, this, t, method,
                                                    std::placeholders::_1, std::placeholders::_2);
     requestResponseMap_[_request->GetTypeName()] = m->GetTypeName();
-    delete m;
-    delete _request;
   }
 
   // Register a handler for a particular message type
@@ -197,7 +195,8 @@
   //! Handle most of the init stuff
   void Init();
 
-  void InternalSendRequest(google::protobuf::Message* _request, void* _ctx, sp_int64 _msecs);
+  void InternalSendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx,
+          sp_int64 _msecs);
   void InternalSendMessage(const google::protobuf::Message& _message);
   void InternalSendResponse(OutgoingPacket* _packet);
 
diff --git a/heron/common/src/cpp/network/httpserver.cpp b/heron/common/src/cpp/network/httpserver.cpp
index 7816b38..5f8793b 100644
--- a/heron/common/src/cpp/network/httpserver.cpp
+++ b/heron/common/src/cpp/network/httpserver.cpp
@@ -89,10 +89,9 @@
 }
 
 void HTTPServer::SendReply(IncomingHTTPRequest* _request, sp_int32 _code,
-                           OutgoingHTTPResponse* _response) {
+                           unique_ptr<OutgoingHTTPResponse> _response) {
   CHECK(_request->underlying_request() == _response->underlying_response());
   evhttp_send_reply(_request->underlying_request(), _code, "", NULL);
-  delete _response;
 }
 
 void HTTPServer::SendErrorReply(IncomingHTTPRequest* _request, sp_int32 _code) {
diff --git a/heron/common/src/cpp/network/httpserver.h b/heron/common/src/cpp/network/httpserver.h
index f310236..5ea4b54 100644
--- a/heron/common/src/cpp/network/httpserver.h
+++ b/heron/common/src/cpp/network/httpserver.h
@@ -37,6 +37,8 @@
 class IncomingHTTPRequest;
 class OutgoingHTTPResponse;
 
+using std::unique_ptr;
+
 class HTTPServer {
  public:
   // Constructor
@@ -56,7 +58,7 @@
   void InstallGenericCallBack(VCallback<IncomingHTTPRequest*> cb);
 
   void SendReply(IncomingHTTPRequest* _request, sp_int32 status_code,
-                 OutgoingHTTPResponse* _response);
+                 unique_ptr<OutgoingHTTPResponse> _response);
   void SendErrorReply(IncomingHTTPRequest* _request, sp_int32 status_code);
   void SendErrorReply(IncomingHTTPRequest* _request, sp_int32 status_code,
                       const sp_string& _reason);
diff --git a/heron/common/tests/cpp/network/http_server_unittest.cpp b/heron/common/tests/cpp/network/http_server_unittest.cpp
index df68a61..cd07712 100644
--- a/heron/common/tests/cpp/network/http_server_unittest.cpp
+++ b/heron/common/tests/cpp/network/http_server_unittest.cpp
@@ -65,9 +65,9 @@
     EXPECT_EQ(value.str(), keyvalues[i].second);
   }
 
-  OutgoingHTTPResponse* response = new OutgoingHTTPResponse(_request);
+  auto response = make_unique<OutgoingHTTPResponse>(_request);
   response->AddResponse("This is response for meta object\r\n");
-  server_->SendReply(_request, 200, response);
+  server_->SendReply(_request, 200, std::move(response));
 }
 
 void TestHttpServer::HandleGenericRequest(IncomingHTTPRequest* _request) {
diff --git a/heron/instance/src/cpp/gateway/stmgr-client.cpp b/heron/instance/src/cpp/gateway/stmgr-client.cpp
index 9397ea3..9650ebc 100644
--- a/heron/instance/src/cpp/gateway/stmgr-client.cpp
+++ b/heron/instance/src/cpp/gateway/stmgr-client.cpp
@@ -51,7 +51,7 @@
   max_reconnect_times_ = config::HeronInternalsConfigReader::Instance()
                                ->GetHeronInstanceReconnectStreammgrTimes();
   reconnect_attempts_ = 0;
-  InstallResponseHandler(new proto::stmgr::RegisterInstanceRequest(),
+  InstallResponseHandler(make_unique<proto::stmgr::RegisterInstanceRequest>(),
                          &StMgrClient::HandleRegisterResponse);
   InstallMessageHandler(&StMgrClient::HandlePhysicalPlan);
   InstallMessageHandler(&StMgrClient::HandleTupleMessage);
@@ -128,11 +128,11 @@
 void StMgrClient::OnReconnectTimer() { Start(); }
 
 void StMgrClient::SendRegisterRequest() {
-  auto request = new proto::stmgr::RegisterInstanceRequest();
+  auto request = make_unique<proto::stmgr::RegisterInstanceRequest>();
   request->set_topology_name(topologyName_);
   request->set_topology_id(topologyId_);
   request->mutable_instance()->CopyFrom(instanceProto_);
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
   return;
 }
 
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
index 61853c1..906ff59 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
@@ -200,12 +200,12 @@
 }
 
 void HeronLocalFileStateMgr::GetTopology(const std::string& _topology_name,
-                                         proto::api::Topology* _return,
+                                         proto::api::Topology& _return,
                                          VCallback<proto::system::StatusCode> cb) {
   std::string contents;
   proto::system::StatusCode status = ReadAllFileContents(GetTopologyPath(_topology_name), contents);
   if (status == proto::system::OK) {
-    if (!_return->ParseFromString(contents)) {
+    if (!_return.ParseFromString(contents)) {
       status = proto::system::STATE_CORRUPTED;
     }
   }
@@ -248,7 +248,7 @@
 }
 
 void HeronLocalFileStateMgr::GetPhysicalPlan(const std::string& _topology_name,
-                                             proto::system::PhysicalPlan* _return,
+                                             shared_ptr<proto::system::PhysicalPlan> _return,
                                              VCallback<proto::system::StatusCode> cb) {
   std::string contents;
   proto::system::StatusCode status =
@@ -273,7 +273,7 @@
 }
 
 void HeronLocalFileStateMgr::GetPackingPlan(const std::string& _topology_name,
-                                             proto::system::PackingPlan* _return,
+                                             shared_ptr<proto::system::PackingPlan> _return,
                                              VCallback<proto::system::StatusCode> cb) {
   std::string contents;
   proto::system::StatusCode status =
@@ -337,7 +337,7 @@
 }
 
 void HeronLocalFileStateMgr::CreateStatefulCheckpoints(const std::string& _topology_name,
-                                const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+                                shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                                 VCallback<proto::system::StatusCode> cb) {
   std::string fname = GetStatefulCheckpointsPath(_topology_name);
   // First check to see if location exists.
@@ -348,7 +348,7 @@
   }
 
   std::string contents;
-  _ckpt.SerializeToString(&contents);
+  _ckpt->SerializeToString(&contents);
   proto::system::StatusCode status = WriteToFile(fname, contents);
   auto wCb = [cb, status](EventLoop::Status) { cb(status); };
   CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
@@ -362,10 +362,10 @@
 }
 
 void HeronLocalFileStateMgr::SetStatefulCheckpoints(const std::string& _topology_name,
-                                const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+                                shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                                 VCallback<proto::system::StatusCode> cb) {
   std::string contents;
-  _ckpt.SerializeToString(&contents);
+  _ckpt->SerializeToString(&contents);
   proto::system::StatusCode status =
       WriteToFile(GetStatefulCheckpointsPath(_topology_name), contents);
   auto wCb = [cb, status](EventLoop::Status) { cb(status); };
@@ -373,7 +373,7 @@
 }
 
 void HeronLocalFileStateMgr::GetStatefulCheckpoints(const std::string& _topology_name,
-                                 proto::ckptmgr::StatefulConsistentCheckpoints* _return,
+                                 shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
                                  VCallback<proto::system::StatusCode> cb) {
   std::string contents;
   proto::system::StatusCode status =
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
index 0b8a090..f6bd655 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
@@ -67,7 +67,7 @@
   void CreateTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
   void DeleteTopology(const std::string& _topology_name, VCallback<proto::system::StatusCode> _cb);
   void SetTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
-  void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+  void GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
                    VCallback<proto::system::StatusCode> _cb);
 
   void CreatePhysicalPlan(const proto::system::PhysicalPlan& _pplan,
@@ -76,13 +76,15 @@
                           VCallback<proto::system::StatusCode> _cb);
   void SetPhysicalPlan(const proto::system::PhysicalPlan& _pplan,
                        VCallback<proto::system::StatusCode> _cb);
-  void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
+  void GetPhysicalPlan(const std::string& _topology_name,
+                       shared_ptr<proto::system::PhysicalPlan> _return,
                        VCallback<proto::system::StatusCode> _cb);
 
   void CreatePackingPlan(const std::string& _topology_name,
                          const proto::system::PackingPlan& _packingPlan,
                          VCallback<proto::system::StatusCode> _cb);
-  void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+  void GetPackingPlan(const std::string& _topology_name,
+                      shared_ptr<proto::system::PackingPlan> _return,
                       VCallback<proto::system::StatusCode> _cb);
 
   void CreateExecutionState(const proto::system::ExecutionState& _pplan,
@@ -95,15 +97,15 @@
                          VCallback<proto::system::StatusCode> _cb);
 
   void CreateStatefulCheckpoints(const std::string& _topology_name,
-                      const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
-                      VCallback<proto::system::StatusCode> _cb);
+                                 shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+                                 VCallback<proto::system::StatusCode> _cb);
   void DeleteStatefulCheckpoints(const std::string& _topology_name,
                             VCallback<proto::system::StatusCode> _cb);
   void GetStatefulCheckpoints(const std::string& _topology_name,
-                      proto::ckptmgr::StatefulConsistentCheckpoints* _return,
-                      VCallback<proto::system::StatusCode> _cb);
+                              shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+                              VCallback<proto::system::StatusCode> _cb);
   void SetStatefulCheckpoints(const std::string& _topology_name,
-                      const proto::ckptmgr::StatefulConsistentCheckpoints& _state,
+                      shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _state,
                       VCallback<proto::system::StatusCode> _cb);
 
   void ListTopologies(std::vector<sp_string>* _return, VCallback<proto::system::StatusCode> _cb);
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
index 016e87e..5201f6c 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
@@ -32,13 +32,14 @@
 namespace heron {
 namespace common {
 
-HeronStateMgr* HeronStateMgr::MakeStateMgr(const std::string& _zk_hostport,
+shared_ptr<HeronStateMgr> HeronStateMgr::MakeStateMgr(const std::string& _zk_hostport,
                                            const std::string& _topleveldir, EventLoop* eventLoop,
                                            bool exitOnSessionExpiry) {
   if (_zk_hostport.empty()) {
-    return new HeronLocalFileStateMgr(_topleveldir, eventLoop);
+    return std::make_shared<HeronLocalFileStateMgr>(_topleveldir, eventLoop);
   } else {
-    return new HeronZKStateMgr(_zk_hostport, _topleveldir, eventLoop, exitOnSessionExpiry);
+    return std::make_shared<HeronZKStateMgr>(_zk_hostport, _topleveldir, eventLoop,
+            exitOnSessionExpiry);
   }
 }
 
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
index ee05f32..6cad372 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
@@ -55,15 +55,17 @@
 namespace heron {
 namespace common {
 
+using std::shared_ptr;
+
 class HeronStateMgr {
  public:
   explicit HeronStateMgr(const std::string& _topleveldir);
   virtual ~HeronStateMgr();
 
   // Factory method to create
-  static HeronStateMgr* MakeStateMgr(const std::string& _zk_hostport,
-                                     const std::string& _topleveldir, EventLoop* eventLoop,
-                                     bool exitOnSessionExpiry = true);
+  static shared_ptr<HeronStateMgr> MakeStateMgr(const std::string& _zk_hostport,
+                                             const std::string& _topleveldir, EventLoop* eventLoop,
+                                             bool exitOnSessionExpiry = true);
 
   //
   // Interface methods
@@ -100,7 +102,7 @@
                               VCallback<proto::system::StatusCode> _cb) = 0;
   virtual void SetTopology(const proto::api::Topology& _top,
                            VCallback<proto::system::StatusCode> _cb) = 0;
-  virtual void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+  virtual void GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
                            VCallback<proto::system::StatusCode> _cb) = 0;
 
   // Gets/Sets PhysicalPlan
@@ -111,12 +113,12 @@
   virtual void SetPhysicalPlan(const proto::system::PhysicalPlan& _plan,
                                VCallback<proto::system::StatusCode> _cb) = 0;
   virtual void GetPhysicalPlan(const std::string& _topology_name,
-                               proto::system::PhysicalPlan* _return,
+                               shared_ptr<proto::system::PhysicalPlan> _return,
                                VCallback<proto::system::StatusCode> _cb) = 0;
 
   // Gets PackingPlan
   virtual void GetPackingPlan(const std::string& _topology_name,
-                              proto::system::PackingPlan* _return,
+                              shared_ptr<proto::system::PackingPlan> _return,
                               VCallback<proto::system::StatusCode> _cb) = 0;
 
   // Gets/Sets ExecutionState
@@ -135,15 +137,15 @@
 
   // Gets/Sets Stateful Checkpoint
   virtual void CreateStatefulCheckpoints(const std::string& _topology_name,
-                           const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+                           shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                            VCallback<proto::system::StatusCode> _cb) = 0;
   virtual void DeleteStatefulCheckpoints(const std::string& _topology_name,
                                   VCallback<proto::system::StatusCode> _cb) = 0;
   virtual void SetStatefulCheckpoints(const std::string& _topology_name,
-                           const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+                            shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                             VCallback<proto::system::StatusCode> _cb) = 0;
   virtual void GetStatefulCheckpoints(const std::string& _topology_name,
-                               proto::ckptmgr::StatefulConsistentCheckpoints* _return,
+                               shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
                                VCallback<proto::system::StatusCode> _cb) = 0;
 
   // Calls to list the topologies and physical plans
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
index 7c6c967..f4f9497 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
@@ -180,12 +180,12 @@
   zkclient_->Set(path, value, std::move(wCb));
 }
 
-void HeronZKStateMgr::GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+void HeronZKStateMgr::GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
                                   VCallback<proto::system::StatusCode> cb) {
   std::string path = GetTopologyPath(_topology_name);
   std::string* contents = new std::string();
 
-  auto wCb = [contents, _return, cb, this](sp_int32 rc) {
+  auto wCb = [contents, &_return, cb, this](sp_int32 rc) {
     this->GetTopologyDone(contents, _return, std::move(cb), rc);
   };
 
@@ -221,7 +221,7 @@
 }
 
 void HeronZKStateMgr::GetPhysicalPlan(const std::string& _topology_name,
-                                      proto::system::PhysicalPlan* _return,
+                                      shared_ptr<proto::system::PhysicalPlan> _return,
                                       VCallback<proto::system::StatusCode> cb) {
   std::string path = GetPhysicalPlanPath(_topology_name);
   std::string* contents = new std::string();
@@ -233,7 +233,7 @@
 }
 
 void HeronZKStateMgr::GetPackingPlan(const std::string& _topology_name,
-                                      proto::system::PackingPlan* _return,
+                                      shared_ptr<proto::system::PackingPlan> _return,
                                       VCallback<proto::system::StatusCode> cb) {
   std::string path = GetPackingPlanPath(_topology_name);
   std::string* contents = new std::string();
@@ -285,11 +285,11 @@
 }
 
 void HeronZKStateMgr::CreateStatefulCheckpoints(const std::string& _topology_name,
-                                  const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+                                  shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                                   VCallback<proto::system::StatusCode> cb) {
   std::string path = GetStatefulCheckpointsPath(_topology_name);
   std::string contents;
-  _ckpt.SerializeToString(&contents);
+  _ckpt->SerializeToString(&contents);
   auto wCb = [cb, this](sp_int32 rc) { this->CreateStatefulCheckpointsDone(std::move(cb), rc); };
 
   zkclient_->CreateNode(path, contents, false, std::move(wCb));
@@ -304,19 +304,19 @@
 }
 
 void HeronZKStateMgr::SetStatefulCheckpoints(const std::string& _topology_name,
-                                  const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+                                  shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                                   VCallback<proto::system::StatusCode> cb) {
   std::string path = GetStatefulCheckpointsPath(_topology_name);
   std::string contents;
-  _ckpt.SerializeToString(&contents);
+  _ckpt->SerializeToString(&contents);
   auto wCb = [cb, this](sp_int32 rc) { this->SetStatefulCheckpointsDone(std::move(cb), rc); };
 
   zkclient_->Set(path, contents, std::move(wCb));
 }
 
 void HeronZKStateMgr::GetStatefulCheckpoints(const std::string& _topology_name,
-                                   proto::ckptmgr::StatefulConsistentCheckpoints* _return,
-                                   VCallback<proto::system::StatusCode> cb) {
+                                 shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+                                 VCallback<proto::system::StatusCode> cb) {
   std::string path = GetStatefulCheckpointsPath(_topology_name);
   std::string* contents = new std::string();
   auto wCb = [contents, _return, cb, this](sp_int32 rc) {
@@ -483,11 +483,11 @@
   cb(code);
 }
 
-void HeronZKStateMgr::GetTopologyDone(std::string* _contents, proto::api::Topology* _return,
+void HeronZKStateMgr::GetTopologyDone(std::string* _contents, proto::api::Topology& _return,
                                       VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
   proto::system::StatusCode code = proto::system::OK;
   if (_rc == ZOK) {
-    if (!_return->ParseFromString(*_contents)) {
+    if (!_return.ParseFromString(*_contents)) {
       LOG(ERROR) << "topology parsing failed; zk corruption?" << std::endl;
       code = proto::system::STATE_CORRUPTED;
     }
@@ -541,7 +541,7 @@
 }
 
 void HeronZKStateMgr::GetPhysicalPlanDone(std::string* _contents,
-                                          proto::system::PhysicalPlan* _return,
+                                          shared_ptr<proto::system::PhysicalPlan> _return,
                                           VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
   proto::system::StatusCode code = proto::system::OK;
   if (_rc == ZOK) {
@@ -559,7 +559,7 @@
 }
 
 void HeronZKStateMgr::GetPackingPlanDone(std::string* _contents,
-                                          proto::system::PackingPlan* _return,
+                                          shared_ptr<proto::system::PackingPlan> _return,
                                           VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
   proto::system::StatusCode code = proto::system::OK;
   if (_rc == ZOK) {
@@ -678,8 +678,8 @@
 }
 
 void HeronZKStateMgr::GetStatefulCheckpointsDone(std::string* _contents,
-                                     proto::ckptmgr::StatefulConsistentCheckpoints* _return,
-                                     VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
+                                 shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+                                 VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
   proto::system::StatusCode code = proto::system::OK;
   if (_rc == ZOK) {
     if (!_return->ParseFromString(*_contents)) {
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
index 4c9b9b7..202e746 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
@@ -87,7 +87,7 @@
   void CreateTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
   void DeleteTopology(const std::string& _topology_name, VCallback<proto::system::StatusCode> _cb);
   void SetTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
-  void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+  void GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
                    VCallback<proto::system::StatusCode> _cb);
 
   // Gets/Sets physical plan
@@ -97,10 +97,12 @@
                           VCallback<proto::system::StatusCode> _cb);
   void SetPhysicalPlan(const proto::system::PhysicalPlan& _pplan,
                        VCallback<proto::system::StatusCode> _cb);
-  void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
+  void GetPhysicalPlan(const std::string& _topology_name,
+                       shared_ptr<proto::system::PhysicalPlan> _return,
                        VCallback<proto::system::StatusCode> _cb);
 
-  void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+  void GetPackingPlan(const std::string& _topology_name,
+                      shared_ptr<proto::system::PackingPlan> _return,
                       VCallback<proto::system::StatusCode> _cb);
 
   // Gets/Sets execution state
@@ -115,16 +117,16 @@
 
   // Gets/Sets the Stateful Checkpoints
   void CreateStatefulCheckpoints(const std::string& _topology_name,
-               const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
-               VCallback<proto::system::StatusCode> _cb);
+          shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+          VCallback<proto::system::StatusCode> _cb);
   void DeleteStatefulCheckpoints(const std::string& _topology_name,
                VCallback<proto::system::StatusCode> _cb);
   void SetStatefulCheckpoints(const std::string& _topology_name,
-               const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+               shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                VCallback<proto::system::StatusCode> _cb);
   void GetStatefulCheckpoints(const std::string& _topology_name,
-               proto::ckptmgr::StatefulConsistentCheckpoints* _return,
-               VCallback<proto::system::StatusCode> _cb);
+                              shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+                              VCallback<proto::system::StatusCode> _cb);
 
   void ListTopologies(std::vector<sp_string>* _return, VCallback<proto::system::StatusCode> _cb);
   void ListExecutionStateTopologies(std::vector<sp_string>* _return,
@@ -153,15 +155,15 @@
   void CreateTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void DeleteTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void SetTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
-  void GetTopologyDone(std::string* _contents, proto::api::Topology* _return,
+  void GetTopologyDone(std::string* _contents, proto::api::Topology& _return,
                        VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
 
   void CreatePhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void DeletePhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void SetPhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
-  void GetPhysicalPlanDone(std::string* _contents, proto::system::PhysicalPlan* _return,
+  void GetPhysicalPlanDone(std::string* _contents, shared_ptr<proto::system::PhysicalPlan> _return,
                            VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
-  void GetPackingPlanDone(std::string* _contents, proto::system::PackingPlan* _return,
+  void GetPackingPlanDone(std::string* _contents, shared_ptr<proto::system::PackingPlan> _return,
                           VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
 
   void CreateExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
@@ -174,7 +176,7 @@
   void DeleteStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void SetStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
   void GetStatefulCheckpointsDone(std::string* _contents,
-                           proto::ckptmgr::StatefulConsistentCheckpoints* _return,
+                           shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
                            VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
 
   void ListTopologiesDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
index 251725c..7dcfe6a 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
@@ -45,7 +45,7 @@
     neighbour_calculator_(_neighbour_calculator),
     metrics_manager_client_(_metrics_manager_client), tupleset_drainer_(_tupleset_drainer),
     tuplestream_drainer_(_tuplestream_drainer), ckpt_drainer_(_ckpt_drainer) {
-  size_metric_ = new common::AssignableMetric(current_size_);
+  size_metric_ = std::make_shared<common::AssignableMetric>(current_size_);
   metrics_manager_client_->register_metric("__stateful_gateway_size", size_metric_);
 }
 
@@ -53,8 +53,8 @@
   for (auto kv : pending_tuples_) {
     delete kv.second;
   }
+
   metrics_manager_client_->unregister_metric("__stateful_gateway_size");
-  delete size_metric_;
 }
 
 void CheckpointGateway::SendToInstance(sp_int32 _task_id,
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
index 1040523..0233e4e 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
@@ -102,7 +102,7 @@
   sp_uint64 current_size_;
   NeighbourCalculator* neighbour_calculator_;
   common::MetricsMgrSt* metrics_manager_client_;
-  common::AssignableMetric* size_metric_;
+  std::shared_ptr<common::AssignableMetric> size_metric_;
   std::unordered_map<sp_int32, CheckpointInfo*> pending_tuples_;
   std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tupleset_drainer_;
   std::function<void(proto::stmgr::TupleStreamMessage*)> tuplestream_drainer_;
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index a41bf9b..a36527f 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -27,6 +27,9 @@
 namespace heron {
 namespace stmgr {
 
+using std::unique_ptr;
+using proto::ckptmgr::SaveInstanceStateRequest;
+
 CkptMgrClient::CkptMgrClient(EventLoop* eventloop, const NetworkOptions& _options,
                              const sp_string& _topology_name, const sp_string& _topology_id,
                              const sp_string& _ckptmgr_id, const sp_string& _stmgr_id,
@@ -50,11 +53,11 @@
   // TODO(nlu): take the value from config
   reconnect_cpktmgr_interval_sec_ = 10;
 
-  InstallResponseHandler(new proto::ckptmgr::RegisterStMgrRequest(),
+  InstallResponseHandler(make_unique<proto::ckptmgr::RegisterStMgrRequest>(),
                          &CkptMgrClient::HandleRegisterStMgrResponse);
-  InstallResponseHandler(new proto::ckptmgr::SaveInstanceStateRequest(),
+  InstallResponseHandler(make_unique<proto::ckptmgr::SaveInstanceStateRequest>(),
                          &CkptMgrClient::HandleSaveInstanceStateResponse);
-  InstallResponseHandler(new proto::ckptmgr::GetInstanceStateRequest(),
+  InstallResponseHandler(make_unique<proto::ckptmgr::GetInstanceStateRequest>(),
                          &CkptMgrClient::HandleGetInstanceStateResponse);
 }
 
@@ -141,17 +144,17 @@
 void CkptMgrClient::OnReconnectTimer() { Start(); }
 
 void CkptMgrClient::SendRegisterRequest() {
-  auto request = new proto::ckptmgr::RegisterStMgrRequest();
+  auto request = make_unique<proto::ckptmgr::RegisterStMgrRequest>();
   request->set_topology_name(topology_name_);
   request->set_topology_id(topology_id_);
   request->set_stmgr_id(stmgr_id_);
   request->mutable_physical_plan()->CopyFrom(*pplan_);
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
 }
 
-void CkptMgrClient::SaveInstanceState(proto::ckptmgr::SaveInstanceStateRequest* _request) {
+void CkptMgrClient::SaveInstanceState(unique_ptr<SaveInstanceStateRequest> _request) {
   LOG(INFO) << "Sending SaveInstanceState to ckptmgr" << std::endl;
-  SendRequest(_request, NULL);
+  SendRequest(std::move(_request), NULL);
 }
 
 void CkptMgrClient::SetPhysicalPlan(proto::system::PhysicalPlan& _pplan) {
@@ -170,10 +173,10 @@
 void CkptMgrClient::GetInstanceState(const proto::system::Instance& _instance,
                                      const std::string& _checkpoint_id,
                                      int32_t* _nattempts) {
-  auto request = new proto::ckptmgr::GetInstanceStateRequest();
+  auto request = make_unique<proto::ckptmgr::GetInstanceStateRequest>();
   request->mutable_instance()->CopyFrom(_instance);
   request->set_checkpoint_id(_checkpoint_id);
-  SendRequest(request, _nattempts);
+  SendRequest(std::move(request), _nattempts);
 }
 
 void CkptMgrClient::HandleSaveInstanceStateResponse(void*,
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index 4d34004..6a58888 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -29,6 +29,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::unique_ptr;
+
 class CkptMgrClient : public Client {
  public:
   CkptMgrClient(EventLoop* eventLoop, const NetworkOptions& _options,
@@ -44,7 +46,7 @@
 
   void Quit();
 
-  virtual void SaveInstanceState(proto::ckptmgr::SaveInstanceStateRequest* _request);
+  virtual void SaveInstanceState(unique_ptr<proto::ckptmgr::SaveInstanceStateRequest> _request);
   virtual void GetInstanceState(const proto::system::Instance& _instance,
                                 const std::string& _checkpoint_id);
   virtual void SetPhysicalPlan(proto::system::PhysicalPlan& _pplan);
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 5f8d9a2..205f8d1 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -40,6 +40,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::make_shared;
+
 // Num data tuples sent to instances associated with this stream manager
 const sp_string METRIC_DATA_TUPLES_TO_INSTANCES = "__tuples_to_workers";
 // Num ack tuples sent to instances associated with this stream manager
@@ -107,8 +109,8 @@
   InstallMessageHandler(&InstanceServer::HandleStoreInstanceStateCheckpointMessage);
   InstallMessageHandler(&InstanceServer::HandleRestoreInstanceStateResponse);
 
-  instance_server_metrics_ = new heron::common::MultiCountMetric();
-  back_pressure_metric_aggr_ = new heron::common::TimeSpentMetric();
+  instance_server_metrics_ = make_shared<heron::common::MultiCountMetric>();
+  back_pressure_metric_aggr_ = make_shared<heron::common::TimeSpentMetric>();
   metrics_manager_client_->register_metric("__server", instance_server_metrics_);
   metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR,
                                            back_pressure_metric_aggr_);
@@ -133,8 +135,10 @@
   Stop();
   // Unregister and delete the metrics.
   for (auto immIter = instance_metric_map_.begin();
-      immIter != instance_metric_map_.end(); ++immIter) {
-    sp_string instance_id = immIter->first;
+            immIter != instance_metric_map_.end();
+            immIter = instance_metric_map_.erase(immIter)) {
+    const sp_string& instance_id = immIter->first;
+
     for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
       if (iter->second->instance_->instance_id() != instance_id) continue;
       InstanceData* data = iter->second;
@@ -142,14 +146,14 @@
       if (!iConn) break;
       sp_string metric_name = MakeBackPressureCompIdMetricName(instance_id);
       metrics_manager_client_->unregister_metric(metric_name);
-      delete immIter->second;
     }
   }
 
   // Clean the connection_buffer_metric_map
   for (auto qmmIter = connection_buffer_metric_map_.begin();
-      qmmIter != connection_buffer_metric_map_.end(); ++qmmIter) {
+            qmmIter != connection_buffer_metric_map_.end();) {
     const sp_string& instance_id = qmmIter->first;
+
     for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
       if (iter->second->instance_->instance_id() != instance_id) continue;
       InstanceData* data = iter->second;
@@ -157,25 +161,23 @@
       if (!iConn) break;
       sp_string metric_name = MakeQueueSizeCompIdMetricName(instance_id);
       metrics_manager_client_->unregister_metric(metric_name);
-      delete qmmIter->second;
     }
+
+    qmmIter = connection_buffer_metric_map_.erase(qmmIter);
   }
 
   // Clean the connection_buffer_length_metric_map
   for (auto qlmIter = connection_buffer_length_metric_map_.begin();
-    qlmIter != connection_buffer_length_metric_map_.end(); ++qlmIter) {
+            qlmIter != connection_buffer_length_metric_map_.end();) {
     const sp_string& instance_id = qlmIter->first;
     sp_string metric_name = MakeQueueLengthCompIdMetricName(instance_id);
     metrics_manager_client_->unregister_metric(metric_name);
-    delete qlmIter->second;
+    qlmIter = connection_buffer_length_metric_map_.erase(qlmIter);
   }
 
   metrics_manager_client_->unregister_metric("__server");
   metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR);
 
-  delete instance_server_metrics_;
-  delete back_pressure_metric_aggr_;
-
   // cleanup the instance info
   for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
     delete iter->second;
@@ -235,7 +237,7 @@
       remote_ends_who_caused_back_pressure_.end()) {
     _conn->unsetCausedBackPressure();
     remote_ends_who_caused_back_pressure_.erase(GetInstanceName(_conn));
-    heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[GetInstanceName(_conn)];
+    auto instance_metric = instance_metric_map_[GetInstanceName(_conn)];
     instance_metric->Stop();
     if (!stmgr_->DidAnnounceBackPressure()) {
       stmgr_->SendStopBackPressureToOtherStMgrs();
@@ -262,7 +264,6 @@
     auto immiter = instance_metric_map_.find(instance_id);
     if (immiter != instance_metric_map_.end()) {
       metrics_manager_client_->unregister_metric(MakeBackPressureCompIdMetricName(instance_id));
-      delete instance_metric_map_[instance_id];
       instance_metric_map_.erase(instance_id);
     }
 
@@ -270,7 +271,6 @@
     auto qmmiter = connection_buffer_metric_map_.find(instance_id);
     if (qmmiter != connection_buffer_metric_map_.end()) {
       metrics_manager_client_->unregister_metric(MakeQueueSizeCompIdMetricName(instance_id));
-      delete connection_buffer_metric_map_[instance_id];
       connection_buffer_metric_map_.erase(instance_id);
     }
 
@@ -278,7 +278,6 @@
     auto qlmiter = connection_buffer_length_metric_map_.find(instance_id);
     if (qlmiter != connection_buffer_length_metric_map_.end()) {
       metrics_manager_client_->unregister_metric(MakeQueueLengthCompIdMetricName(instance_id));
-      delete connection_buffer_length_metric_map_[instance_id];
       connection_buffer_length_metric_map_.erase(instance_id);
     }
 
@@ -333,13 +332,13 @@
     }
     // Create a metric for this instance
     if (instance_metric_map_.find(instance_id) == instance_metric_map_.end()) {
-      auto instance_metric = new heron::common::TimeSpentMetric();
+      auto instance_metric = make_shared<heron::common::TimeSpentMetric>();
       metrics_manager_client_->register_metric(MakeBackPressureCompIdMetricName(instance_id),
                                                instance_metric);
       instance_metric_map_[instance_id] = instance_metric;
     }
     if (connection_buffer_metric_map_.find(instance_id) == connection_buffer_metric_map_.end()) {
-      auto queue_metric = new heron::common::MultiMeanMetric();
+      auto queue_metric = make_shared<heron::common::MultiMeanMetric>();
       metrics_manager_client_->register_metric(MakeQueueSizeCompIdMetricName(instance_id),
                                                queue_metric);
       connection_buffer_metric_map_[instance_id] = queue_metric;
@@ -348,7 +347,7 @@
     if (connection_buffer_length_metric_map_.find(instance_id) ==
       connection_buffer_length_metric_map_.end()) {
       task_id_to_name[task_id] = instance_id;
-      auto queue_metric = new heron::common::MultiCountMetric();
+      auto queue_metric = make_shared<heron::common::MultiCountMetric>();
       metrics_manager_client_->register_metric(MakeQueueLengthCompIdMetricName(instance_id),
                                                queue_metric);
       connection_buffer_length_metric_map_[instance_id] = queue_metric;
@@ -563,7 +562,7 @@
   }
 
   // Indicate which instance component had back pressure
-  heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[instance_name];
+  auto instance_metric = instance_metric_map_[instance_name];
   instance_metric->Start();
 
   remote_ends_who_caused_back_pressure_.insert(instance_name);
@@ -592,7 +591,7 @@
   remote_ends_who_caused_back_pressure_.erase(instance_name);
 
   // Indicate which instance component stopped back pressure
-  heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[instance_name];
+  auto instance_metric = instance_metric_map_[instance_name];
   instance_metric->Stop();
 
   if (!stmgr_->DidAnnounceBackPressure()) {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index 5ad504a..f4fb1e9 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -42,6 +42,9 @@
 namespace heron {
 namespace stmgr {
 
+using std::shared_ptr;
+using std::unordered_map;
+
 class StMgr;
 class NeighbourCalculator;
 class CheckpointGateway;
@@ -157,15 +160,16 @@
 
   // map of Instance_id to metric
   // Used for back pressure metrics
-  typedef std::unordered_map<sp_string, heron::common::TimeSpentMetric*> InstanceMetricMap;
+  typedef unordered_map<sp_string, shared_ptr<heron::common::TimeSpentMetric>> InstanceMetricMap;
   InstanceMetricMap instance_metric_map_;
 
   // map of Instance_id to queue metric
-  typedef std::unordered_map<sp_string, heron::common::MultiMeanMetric*> ConnectionBufferMetricMap;
+  typedef unordered_map<sp_string,
+                        shared_ptr<heron::common::MultiMeanMetric>> ConnectionBufferMetricMap;
   ConnectionBufferMetricMap connection_buffer_metric_map_;
 
   // map of Instance_id to queue length metric
-  typedef std::unordered_map<sp_string, heron::common::MultiCountMetric*>
+  typedef std::unordered_map<sp_string, shared_ptr<heron::common::MultiCountMetric>>
     ConnectionBufferLengthMetricMap;
   ConnectionBufferLengthMetricMap connection_buffer_length_metric_map_;
 
@@ -183,8 +187,8 @@
 
   // Metrics
   heron::common::MetricsMgrSt* metrics_manager_client_;
-  heron::common::MultiCountMetric* instance_server_metrics_;
-  heron::common::TimeSpentMetric* back_pressure_metric_aggr_;
+  shared_ptr<heron::common::MultiCountMetric> instance_server_metrics_;
+  shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_aggr_;
 
   bool spouts_under_back_pressure_;
 
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
index 538c71d..726c341 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
@@ -39,6 +39,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::make_shared;
+
 // Stats for restore
 const sp_string METRIC_START_RESTORE = "__start_restore";
 const sp_string METRIC_START_RESTORE_IN_PROGRESS = "__start_restore_in_progress";
@@ -65,8 +67,8 @@
   in_progress_ = false;
   restore_done_watcher_ = _restore_done_watcher;
   metrics_manager_client_ = _metrics_manager_client;
-  multi_count_metric_ = new common::MultiCountMetric();
-  time_spent_metric_ = new common::TimeSpentMetric();
+  multi_count_metric_ = make_shared<common::MultiCountMetric>();
+  time_spent_metric_  = make_shared<common::TimeSpentMetric>();
   metrics_manager_client_->register_metric("__stateful_restore_count", multi_count_metric_);
   metrics_manager_client_->register_metric("__stateful_restore_time", time_spent_metric_);
 }
@@ -74,8 +76,6 @@
 StatefulRestorer::~StatefulRestorer() {
   metrics_manager_client_->unregister_metric("__stateful_restore_count");
   metrics_manager_client_->unregister_metric("__stateful_restore_time");
-  delete multi_count_metric_;
-  delete time_spent_metric_;
 }
 
 void StatefulRestorer::StartRestore(const std::string& _checkpoint_id, sp_int64 _restore_txid,
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.h b/heron/stmgr/src/cpp/manager/stateful-restorer.h
index a3e82d0..cf35fb3 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.h
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.h
@@ -47,6 +47,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::shared_ptr;
+
 class InstanceServer;
 class TupleCache;
 class StMgrClientMgr;
@@ -156,8 +158,8 @@
   std::function<void(proto::system::StatusCode, std::string, sp_int64)> restore_done_watcher_;
 
   // Different metrics
-  common::MultiCountMetric* multi_count_metric_;
-  common::TimeSpentMetric* time_spent_metric_;
+  shared_ptr<common::MultiCountMetric> multi_count_metric_;
+  shared_ptr<common::TimeSpentMetric>  time_spent_metric_;
 };
 }  // namespace stmgr
 }  // namespace heron
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 68b002f..572a794 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -34,6 +34,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::make_shared;
+
 // Num data tuples sent to other stream managers
 const sp_string METRIC_DATA_TUPLES_TO_STMGRS = "__tuples_to_stmgrs";
 // Num ack tuples sent to other stream managers
@@ -74,17 +76,17 @@
   reconnect_other_streammgrs_interval_sec_ =
       config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrClientReconnectIntervalSec();
 
-  InstallResponseHandler(new proto::stmgr::StrMgrHelloRequest(), &StMgrClient::HandleHelloResponse);
+  InstallResponseHandler(make_unique<proto::stmgr::StrMgrHelloRequest>(),
+          &StMgrClient::HandleHelloResponse);
   InstallMessageHandler(&StMgrClient::HandleTupleStreamMessage);
 
-  stmgr_client_metrics_ = new heron::common::MultiCountMetric();
+  stmgr_client_metrics_ = make_shared<heron::common::MultiCountMetric>();
   metrics_manager_client_->register_metric("__client_" + other_stmgr_id_, stmgr_client_metrics_);
 }
 
 StMgrClient::~StMgrClient() {
   Stop();
   metrics_manager_client_->unregister_metric("__client_" + other_stmgr_id_);
-  delete stmgr_client_metrics_;
 }
 
 void StMgrClient::Quit() {
@@ -178,11 +180,11 @@
 }
 
 void StMgrClient::SendHelloRequest() {
-  auto request = new proto::stmgr::StrMgrHelloRequest();
+  auto request = make_unique<proto::stmgr::StrMgrHelloRequest>();
   request->set_topology_name(topology_name_);
   request->set_topology_id(topology_id_);
   request->set_stmgr(our_stmgr_id_);
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
   stmgr_client_metrics_->scope(METRIC_HELLO_MESSAGES_TO_STMGRS)->incr_by(1);
   return;
 }
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.h b/heron/stmgr/src/cpp/manager/stmgr-client.h
index 92494dc..2aed880 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.h
@@ -34,6 +34,9 @@
 
 namespace heron {
 namespace stmgr {
+
+using std::shared_ptr;
+
 class StMgrClientMgr;
 
 class StMgrClient : public Client {
@@ -78,7 +81,7 @@
   StMgrClientMgr* client_manager_;
   // Metrics
   heron::common::MetricsMgrSt* metrics_manager_client_;
-  heron::common::MultiCountMetric* stmgr_client_metrics_;
+  shared_ptr<heron::common::MultiCountMetric> stmgr_client_metrics_;
 
   // Configs to be read
   sp_int32 reconnect_other_streammgrs_interval_sec_;
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
index 0a3ecf1..d76e9b4 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
@@ -34,6 +34,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::make_shared;
+
 // New connections made with other stream managers.
 const sp_string METRIC_STMGR_NEW_CONNECTIONS = "__stmgr_new_connections";
 
@@ -52,14 +54,13 @@
       high_watermark_(_high_watermark),
       low_watermark_(_low_watermark),
       droptuples_upon_backpressure_(_droptuples_upon_backpressure) {
-  stmgr_clientmgr_metrics_ = new heron::common::MultiCountMetric();
+  stmgr_clientmgr_metrics_ = make_shared<heron::common::MultiCountMetric>();
   metrics_manager_client_->register_metric("__clientmgr", stmgr_clientmgr_metrics_);
 }
 
 StMgrClientMgr::~StMgrClientMgr() {
   // This should not be called
   metrics_manager_client_->unregister_metric("__clientmgr");
-  delete stmgr_clientmgr_metrics_;
 }
 
 void StMgrClientMgr::StartConnections(const proto::system::PhysicalPlan* _pplan) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
index 13dd7d3..f04515e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
@@ -34,6 +34,9 @@
 
 namespace heron {
 namespace stmgr {
+
+using std::shared_ptr;
+
 class StMgr;
 class StMgrClient;
 
@@ -89,7 +92,7 @@
   StMgr* stream_manager_;
   // Metrics
   heron::common::MetricsMgrSt* metrics_manager_client_;
-  heron::common::MultiCountMetric* stmgr_clientmgr_metrics_;
+  shared_ptr<heron::common::MultiCountMetric> stmgr_clientmgr_metrics_;
 
   sp_int64 high_watermark_;
   sp_int64 low_watermark_;
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index a6ac8bd..c1cf2bc 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -35,6 +35,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::make_shared;
+
 // The scope the metrics in this file are under
 const sp_string SERVER_SCOPE = "__server/";
 // Num data tuples received from other stream managers
@@ -66,16 +68,16 @@
   // The metrics need to be registered one by one here because the "__server" scope
   // is already registered in heron::stmgr::InstanceServer. Duplicated registrations
   // will only have one successfully registered.
-  tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  tuples_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS,
                                            tuples_from_stmgrs_metrics_);
-  ack_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  ack_tuples_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS,
                                            ack_tuples_from_stmgrs_metrics_);
-  fail_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  fail_tuples_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS,
                                            fail_tuples_from_stmgrs_metrics_);
-  bytes_from_stmgrs_metrics_ = new heron::common::CountMetric();
+  bytes_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
                                            bytes_from_stmgrs_metrics_);
 }
@@ -83,13 +85,9 @@
 StMgrServer::~StMgrServer() {
   Stop();
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS);
-  delete tuples_from_stmgrs_metrics_;
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
-  delete ack_tuples_from_stmgrs_metrics_;
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
-  delete fail_tuples_from_stmgrs_metrics_;
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
-  delete bytes_from_stmgrs_metrics_;
 }
 
 void StMgrServer::HandleNewConnection(Connection* _conn) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 4ba28f4..2ae0be6 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -39,6 +39,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::shared_ptr;
+
 class StMgr;
 
 class StMgrServer : public Server {
@@ -99,10 +101,10 @@
 
   // Metrics
   heron::common::MetricsMgrSt* metrics_manager_client_;
-  heron::common::CountMetric* tuples_from_stmgrs_metrics_;
-  heron::common::CountMetric* ack_tuples_from_stmgrs_metrics_;
-  heron::common::CountMetric* fail_tuples_from_stmgrs_metrics_;
-  heron::common::CountMetric* bytes_from_stmgrs_metrics_;
+  shared_ptr<heron::common::CountMetric> tuples_from_stmgrs_metrics_;
+  shared_ptr<heron::common::CountMetric> ack_tuples_from_stmgrs_metrics_;
+  shared_ptr<heron::common::CountMetric> fail_tuples_from_stmgrs_metrics_;
+  shared_ptr<heron::common::CountMetric> bytes_from_stmgrs_metrics_;
 };
 
 }  // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 3d8fa28..56cf760 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -52,6 +52,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::make_shared;
+
 // Stats for the process
 const sp_string METRIC_CPU_USER = "__cpu_user_usec";
 const sp_string METRIC_CPU_SYSTEM = "__cpu_system_usec";
@@ -118,17 +120,17 @@
   state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zkhostport_, zkroot_, eventLoop_, false);
   metrics_manager_client_ = new heron::common::MetricsMgrSt(
       metricsmgr_port_, metrics_export_interval_sec, eventLoop_);
-  stmgr_process_metrics_ = new heron::common::MultiAssignableMetric();
+  stmgr_process_metrics_ = make_shared<heron::common::MultiAssignableMetric>();
   metrics_manager_client_->register_metric(METRIC_PROCESS, stmgr_process_metrics_);
-  restore_initiated_metrics_ = new heron::common::CountMetric();
+  restore_initiated_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(METRIC_RESTORE_INITIALIZED, restore_initiated_metrics_);
-  dropped_during_restore_metrics_ = new heron::common::MultiCountMetric();
+  dropped_during_restore_metrics_ = make_shared<heron::common::MultiCountMetric>();
   metrics_manager_client_->register_metric(METRIC_DROPPED_DURING_RESTORE,
                                            dropped_during_restore_metrics_);
-  instance_bytes_received_metrics_ = new heron::common::MultiCountMetric();
+  instance_bytes_received_metrics_ = make_shared<heron::common::MultiCountMetric>();
   metrics_manager_client_->register_metric(METRIC_INSTANCE_BYTES_RECEIVED,
                                            instance_bytes_received_metrics_);
-  back_pressure_metric_initiated_ = new heron::common::TimeSpentMetric();
+  back_pressure_metric_initiated_ = make_shared<heron::common::TimeSpentMetric>();
   metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT,
                                            back_pressure_metric_initiated_);
   state_mgr_->SetTMasterLocationWatch(topology_name_, [this]() { this->FetchTMasterLocation(); });
@@ -205,13 +207,7 @@
   metrics_manager_client_->unregister_metric(METRIC_DROPPED_DURING_RESTORE);
   metrics_manager_client_->unregister_metric(METRIC_INSTANCE_BYTES_RECEIVED);
   metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT);
-  delete stmgr_process_metrics_;
-  delete restore_initiated_metrics_;
-  delete dropped_during_restore_metrics_;
-  delete instance_bytes_received_metrics_;
-  delete back_pressure_metric_initiated_;
   delete tuple_cache_;
-  delete state_mgr_;
   delete pplan_;
   delete stmgr_server_;
   delete instance_server_;
@@ -1050,11 +1046,10 @@
   }
 
   // save the checkpoint
-  proto::ckptmgr::SaveInstanceStateRequest* message =
-         new proto::ckptmgr::SaveInstanceStateRequest();
+  auto message = make_unique<proto::ckptmgr::SaveInstanceStateRequest>();
   message->mutable_instance()->CopyFrom(_instance);
   message->mutable_checkpoint()->CopyFrom(_message);
-  ckptmgr_client_->SaveInstanceState(message);
+  ckptmgr_client_->SaveInstanceState(std::move(message));
 }
 
 // Invoked by CheckpointMgr Client when it finds out that the ckptmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index abf84a6..885c379 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -47,6 +47,8 @@
 namespace heron {
 namespace stmgr {
 
+using std::shared_ptr;
+
 class StMgrServer;
 class InstanceServer;
 class StMgrClientMgr;
@@ -199,7 +201,7 @@
   static void PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
                                                     proto::api::Topology* _topology);
 
-  heron::common::HeronStateMgr* state_mgr_;
+  shared_ptr<heron::common::HeronStateMgr> state_mgr_;
   proto::system::PhysicalPlan* pplan_;
   sp_string topology_name_;
   sp_string topology_id_;
@@ -241,17 +243,17 @@
   CkptMgrClient* ckptmgr_client_;
 
   // Process related metrics
-  heron::common::MultiAssignableMetric* stmgr_process_metrics_;
+  shared_ptr<heron::common::MultiAssignableMetric> stmgr_process_metrics_;
 
   // Stateful Restore metric
-  heron::common::CountMetric* restore_initiated_metrics_;
-  heron::common::MultiCountMetric* dropped_during_restore_metrics_;
+  shared_ptr<heron::common::CountMetric> restore_initiated_metrics_;
+  shared_ptr<heron::common::MultiCountMetric> dropped_during_restore_metrics_;
 
   // Instance related metrics
-  heron::common::MultiCountMetric* instance_bytes_received_metrics_;
+  shared_ptr<heron::common::MultiCountMetric> instance_bytes_received_metrics_;
 
   // Backpressure relarted metrics
-  heron::common::TimeSpentMetric* back_pressure_metric_initiated_;
+  shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_initiated_;
 
   // The time at which the stmgr was started up
   std::chrono::high_resolution_clock::time_point start_time_;
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index 3046ba5..fab8cd8 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -66,9 +66,9 @@
   reconnect_timer_cb = [this]() { this->OnReConnectTimer(); };
   heartbeat_timer_cb = [this]() { this->OnHeartbeatTimer(); };
 
-  InstallResponseHandler(new proto::tmaster::StMgrRegisterRequest(),
+  InstallResponseHandler(make_unique<proto::tmaster::StMgrRegisterRequest>(),
                          &TMasterClient::HandleRegisterResponse);
-  InstallResponseHandler(new proto::tmaster::StMgrHeartbeatRequest(),
+  InstallResponseHandler(make_unique<proto::tmaster::StMgrHeartbeatRequest>(),
                          &TMasterClient::HandleHeartbeatResponse);
   InstallMessageHandler(&TMasterClient::HandleNewAssignmentMessage);
   InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointMessage);
@@ -234,7 +234,7 @@
 }
 
 void TMasterClient::SendRegisterRequest() {
-  auto request = new proto::tmaster::StMgrRegisterRequest();
+  auto request = make_unique<proto::tmaster::StMgrRegisterRequest>();
 
   sp_string cwd;
   FileUtils::getCwd(cwd);
@@ -251,7 +251,7 @@
     request->add_instances()->CopyFrom(*(*iter));
   }
 
-  SendRequest(request, nullptr);
+  SendRequest(std::move(request), nullptr);
   return;
 }
 
@@ -268,11 +268,11 @@
 }
 
 void TMasterClient::SendHeartbeatRequest() {
-  auto request = new proto::tmaster::StMgrHeartbeatRequest();
+  auto request = make_unique<proto::tmaster::StMgrHeartbeatRequest>();
   request->set_heartbeat_time(time(nullptr));
   // TODO(vikasr) Send actual stats
   request->mutable_stats();
-  SendRequest(request, nullptr);
+  SendRequest(std::move(request), nullptr);
   return;
 }
 
diff --git a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
index 3acf5fc..ffbaf59 100644
--- a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
@@ -137,10 +137,10 @@
   return ostr.str();
 }
 
-heron::proto::system::Instance* CreateInstance(int32_t _comp, int32_t _comp_instance,
+unique_ptr<heron::proto::system::Instance> CreateInstance(int32_t _comp, int32_t _comp_instance,
                                                int32_t _stmgr_id,
                                                int32_t _global_index, bool _is_spout) {
-  heron::proto::system::Instance* imap = new heron::proto::system::Instance();
+  auto imap = make_unique<heron::proto::system::Instance>();
   imap->set_instance_id(CreateInstanceId(_global_index));
   imap->set_stmgr_id(GenerateStMgrId(_stmgr_id));
   heron::proto::system::InstanceInfo* inst = imap->mutable_info();
@@ -178,24 +178,20 @@
   int32_t global_index = 1;
   for (int spout = 0; spout < nSpouts; ++spout) {
     for (int spout_instance = 0; spout_instance < nSpoutInstances; ++spout_instance) {
-      heron::proto::system::Instance* instance =
-          CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
+      auto instance = CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
       if (++stmgr_assignment >= nContainers) {
         stmgr_assignment = 0;
       }
       pplan->add_instances()->CopyFrom(*instance);
-      delete instance;
     }
   }
   for (int bolt = 0; bolt < nBolts; ++bolt) {
     for (int bolt_instance = 0; bolt_instance < nBoltInstances; ++bolt_instance) {
-      heron::proto::system::Instance* instance =
-          CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
+      auto instance = CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
       if (++stmgr_assignment >= nContainers) {
         stmgr_assignment = 0;
       }
       pplan->add_instances()->CopyFrom(*instance);
-      delete instance;
     }
   }
   return pplan;
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.cpp b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
index 926c693..31fbec6 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
@@ -43,7 +43,7 @@
       stmgr_id_(_stmgr_id),
       recvd_stmgr_pplan_(NULL),
       register_response_status(heron::proto::system::STMGR_DIDNT_REGISTER) {
-  InstallResponseHandler(new heron::proto::stmgr::RegisterInstanceRequest(),
+  InstallResponseHandler(make_unique<heron::proto::stmgr::RegisterInstanceRequest>(),
                          &DummyInstance::HandleInstanceResponse);
   InstallMessageHandler(&DummyInstance::HandleTupleMessage);
   InstallMessageHandler(&DummyInstance::HandleNewInstanceAssignmentMsg);
@@ -95,7 +95,7 @@
     heron::proto::stmgr::NewInstanceAssignmentMessage*) {}
 
 void DummyInstance::CreateAndSendInstanceRequest() {
-  auto request = new heron::proto::stmgr::RegisterInstanceRequest();
+  auto request = make_unique<heron::proto::stmgr::RegisterInstanceRequest>();
   heron::proto::system::Instance* instance = request->mutable_instance();
   instance->set_instance_id(instance_id_);
   instance->set_stmgr_id(stmgr_id_);
@@ -104,7 +104,7 @@
   instance->mutable_info()->set_component_name(component_name_);
   request->set_topology_name(topology_name_);
   request->set_topology_id(topology_id_);
-  SendRequest(request, nullptr);
+  SendRequest(std::move(request), nullptr);
   return;
 }
 
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
index 372e3de..2e18c57 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
@@ -26,18 +26,20 @@
 #include "network/network.h"
 #include "server/dummy_stmgr.h"
 
+using std::shared_ptr;
+
 ///////////////////////////// DummyTMasterClient ///////////////////////////////////////////
 DummyTMasterClient::DummyTMasterClient(
     EventLoopImpl* eventLoop, const NetworkOptions& _options, const sp_string& stmgr_id,
     const sp_string& stmgr_host, sp_int32 stmgr_port, sp_int32 shell_port,
-    const std::vector<heron::proto::system::Instance*>& _instances)
+    const std::vector<shared_ptr<heron::proto::system::Instance>>& _instances)
     : Client(eventLoop, _options),
       stmgr_id_(stmgr_id),
       stmgr_host_(stmgr_host),
       stmgr_port_(stmgr_port),
       shell_port_(shell_port),
       instances_(_instances) {
-  InstallResponseHandler(new heron::proto::tmaster::StMgrRegisterRequest(),
+  InstallResponseHandler(make_unique<heron::proto::tmaster::StMgrRegisterRequest>(),
                          &DummyTMasterClient::HandleRegisterResponse);
   // Setup the call back function to be invoked when retrying
   retry_cb_ = [this]() { this->Retry(); };
@@ -62,8 +64,7 @@
 void DummyTMasterClient::HandleClose(NetworkErrorCode) {}
 
 void DummyTMasterClient::CreateAndSendRegisterRequest() {
-  heron::proto::tmaster::StMgrRegisterRequest* request =
-      new heron::proto::tmaster::StMgrRegisterRequest();
+  auto request = make_unique<heron::proto::tmaster::StMgrRegisterRequest>();
   heron::proto::system::StMgr* stmgr = request->mutable_stmgr();
   sp_string cwd;
   stmgr->set_id(stmgr_id_);
@@ -76,14 +77,14 @@
   for (auto iter = instances_.begin(); iter != instances_.end(); ++iter) {
     request->add_instances()->CopyFrom(**iter);
   }
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
 }
 
 ///////////////////////////// DummyStMgr /////////////////////////////////////////////////
 DummyStMgr::DummyStMgr(EventLoopImpl* ss, const NetworkOptions& options, const sp_string& stmgr_id,
                        const sp_string& stmgr_host, sp_int32 stmgr_port,
                        const sp_string& tmaster_host, sp_int32 tmaster_port, sp_int32 shell_port,
-                       const std::vector<heron::proto::system::Instance*>& _instances)
+                       const std::vector<shared_ptr<heron::proto::system::Instance>>& _instances)
     : Server(ss, options), num_start_bp_(0), num_stop_bp_(0) {
   NetworkOptions tmaster_options;
   tmaster_options.set_host(tmaster_host);
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.h b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
index 73a73d5..cf30ca3 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.h
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
@@ -28,7 +28,7 @@
   DummyTMasterClient(EventLoopImpl* eventLoop, const NetworkOptions& _options,
                      const sp_string& stmgr_id, const sp_string& stmgr_host, sp_int32 stmgr_port,
                      sp_int32 shell_port,
-                     const std::vector<heron::proto::system::Instance*>& instances);
+                     const std::vector<std::shared_ptr<heron::proto::system::Instance>>& instances);
   virtual ~DummyTMasterClient();
 
   void setStmgrPort(sp_int32 stmgrPort) {
@@ -52,7 +52,7 @@
   sp_string stmgr_host_;
   sp_int32 stmgr_port_;
   sp_int32 shell_port_;
-  std::vector<heron::proto::system::Instance*> instances_;
+  std::vector<std::shared_ptr<heron::proto::system::Instance>> instances_;
 };
 
 class DummyStMgr : public Server {
@@ -60,7 +60,7 @@
   DummyStMgr(EventLoopImpl* ss, const NetworkOptions& options, const sp_string& stmgr_id,
              const sp_string& stmgr_host, sp_int32 stmgr_port, const sp_string& tmaster_host,
              sp_int32 tmaster_port, sp_int32 shell_port,
-             const std::vector<heron::proto::system::Instance*>& instances);
+             const std::vector<std::shared_ptr<heron::proto::system::Instance>>& instances);
 
   virtual ~DummyStMgr();
   sp_int32 Start();
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index 0bb0c1a..f75f666 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -57,6 +57,8 @@
 sp_string metrics_sinks_config_filename =
     "heron/config/src/yaml/conf/test/test_metrics_sinks.yaml";
 
+using std::shared_ptr;
+
 // Generate a dummy topology
 static heron::proto::api::Topology* GenerateDummyTopology(
     const sp_string& topology_name, const sp_string& topology_id, int num_spouts,
@@ -223,9 +225,9 @@
   return instanceid_stream.str();
 }
 
-heron::proto::system::Instance* CreateInstanceMap(sp_int8 type, sp_int8 instance, sp_int32 stmgr_id,
-                                                  sp_int32 global_index, bool spout) {
-  heron::proto::system::Instance* imap = new heron::proto::system::Instance();
+std::shared_ptr<heron::proto::system::Instance> CreateInstanceMap(sp_int8 type, sp_int8 instance,
+                                             sp_int32 stmgr_id, sp_int32 global_index, bool spout) {
+  auto imap = std::make_shared<heron::proto::system::Instance>();
   imap->set_instance_id(CreateInstanceId(type, instance, spout));
 
   imap->set_stmgr_id(STMGR_NAME + "-" + std::to_string(stmgr_id));
@@ -299,7 +301,7 @@
 void StartDummyStMgr(EventLoopImpl*& ss, DummyStMgr*& mgr, std::thread*& stmgr_thread,
                      sp_int32& stmgr_port, sp_int32 tmaster_port, sp_int32 shell_port,
                      const sp_string& stmgr_id,
-                     const std::vector<heron::proto::system::Instance*>& instances) {
+                     const std::vector<shared_ptr<heron::proto::system::Instance>>& instances) {
   // Create the select server for this stmgr to use
   ss = new EventLoopImpl();
 
@@ -435,10 +437,10 @@
   std::map<sp_int32, std::vector<sp_string> > stmgr_instance_id_list_;
 
   // Stmgr to Instance
-  std::map<sp_int32, std::vector<heron::proto::system::Instance*> > stmgr_instance_list_;
+  std::map<sp_int32, std::vector<shared_ptr<heron::proto::system::Instance>>> stmgr_instance_list_;
 
   // Instanceid to instance
-  std::map<sp_string, heron::proto::system::Instance*> instanceid_instance_;
+  std::map<sp_string, shared_ptr<heron::proto::system::Instance>> instanceid_instance_;
 
   std::map<sp_string, sp_int32> instanceid_stmgr_;
 
@@ -509,7 +511,7 @@
   for (size_t spout = 0; spout < common.num_spouts_; ++spout) {
     for (size_t spout_instance = 0; spout_instance < common.num_spout_instances_;
         ++spout_instance) {
-      heron::proto::system::Instance* imap =
+      auto imap =
           CreateInstanceMap(spout, spout_instance, stmgr_assignment_round, global_index++, true);
       common.stmgr_instance_id_list_[stmgr_assignment_round].push_back(imap->instance_id());
       common.stmgr_instance_list_[stmgr_assignment_round].push_back(imap);
@@ -527,7 +529,7 @@
   // Distribute the bolts
   for (size_t bolt = 0; bolt < common.num_bolts_; ++bolt) {
     for (size_t bolt_instance = 0; bolt_instance < common.num_bolt_instances_; ++bolt_instance) {
-      heron::proto::system::Instance* imap =
+      auto imap =
           CreateInstanceMap(bolt, bolt_instance, stmgr_assignment_round, global_index++, false);
       // Have we completed a round of distribution of components
       common.stmgr_instance_id_list_[stmgr_assignment_round].push_back(imap->instance_id());
@@ -679,7 +681,7 @@
 
   for (auto itr = common.instanceid_instance_.begin();
        itr != common.instanceid_instance_.end(); ++itr)
-    delete itr->second;
+      common.instanceid_instance_.erase(itr->first);
 
   // Clean up the local filesystem state
   FileUtils::removeRecursive(common.dpath_, true);
diff --git a/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp b/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp
index 405b685..5b82aa2 100644
--- a/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp
+++ b/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp
@@ -47,6 +47,8 @@
 const sp_string STMGR_NAME = "stmgr";
 const sp_string LOCALHOST = "127.0.0.1";
 
+using std::unique_ptr;
+
 // Generate a dummy topology
 static heron::proto::api::Topology* GenerateDummyTopology(
     const sp_string& topology_name, const sp_string& topology_id, int num_spouts,
@@ -134,10 +136,10 @@
   return ostr.str();
 }
 
-heron::proto::system::Instance* CreateInstance(int32_t _comp, int32_t _comp_instance,
-                                               int32_t _stmgr_id,
-                                               int32_t _global_index, bool _is_spout) {
-  heron::proto::system::Instance* imap = new heron::proto::system::Instance();
+unique_ptr<heron::proto::system::Instance> CreateInstance(int32_t _comp, int32_t _comp_instance,
+                                                          int32_t _stmgr_id,
+                                                          int32_t _global_index, bool _is_spout) {
+  auto imap = make_unique<heron::proto::system::Instance>();
   imap->set_instance_id(CreateInstanceId(_global_index));
   imap->set_stmgr_id(GenerateStMgrId(_stmgr_id));
   heron::proto::system::InstanceInfo* inst = imap->mutable_info();
@@ -175,24 +177,20 @@
   int32_t global_index = 1;
   for (int spout = 0; spout < nSpouts; ++spout) {
     for (int spout_instance = 0; spout_instance < nSpoutInstances; ++spout_instance) {
-      heron::proto::system::Instance* instance =
-          CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
+      auto instance = CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
       if (++stmgr_assignment >= nContainers) {
         stmgr_assignment = 0;
       }
       pplan->add_instances()->CopyFrom(*instance);
-      delete instance;
     }
   }
   for (int bolt = 0; bolt < nBolts; ++bolt) {
     for (int bolt_instance = 0; bolt_instance < nBoltInstances; ++bolt_instance) {
-      heron::proto::system::Instance* instance =
-          CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
+      auto instance = CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
       if (++stmgr_assignment >= nContainers) {
         stmgr_assignment = 0;
       }
       pplan->add_instances()->CopyFrom(*instance);
-      delete instance;
     }
   }
   return pplan;
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
index b348833..767c649 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
@@ -38,9 +38,11 @@
     // TODO(sanjeev): Take this from config
     reconnect_ckptmgr_interval_sec_ = 10;
 
-    InstallResponseHandler(new proto::ckptmgr::RegisterTMasterRequest(),
+
+
+    InstallResponseHandler(make_unique<proto::ckptmgr::RegisterTMasterRequest>(),
                            &CkptMgrClient::HandleTMasterRegisterResponse);
-    InstallResponseHandler(new proto::ckptmgr::CleanStatefulCheckpointRequest(),
+    InstallResponseHandler(make_unique<proto::ckptmgr::CleanStatefulCheckpointRequest>(),
                            &CkptMgrClient::HandleCleanStatefulCheckpointResponse);
 }
 
@@ -69,7 +71,6 @@
                  << " due to: " << _status << std::endl;
     if (quit_) {
       LOG(ERROR) << "Quitting" << std::endl;
-      delete this;
     } else {
       LOG(INFO) << "Retrying again..." << std::endl;
       AddTimer([this]() { this->OnReconnectTimer(); },
@@ -89,7 +90,7 @@
               << " closed connection with code: " << _status << std::endl;
   }
   if (quit_) {
-    delete this;
+    LOG(ERROR) << "Quitting" << std::endl;
   } else {
     LOG(INFO) << "Will try to reconnect again..." << std::endl;
     AddTimer([this]() { this->OnReconnectTimer(); },
@@ -126,20 +127,20 @@
 
 void CkptMgrClient::SendRegisterRequest() {
   LOG(INFO) << "Sending RegisterTmasterRequest to ckptmgr" << std::endl;
-  auto request = new proto::ckptmgr::RegisterTMasterRequest();
+  auto request = make_unique<proto::ckptmgr::RegisterTMasterRequest>();
   request->set_topology_name(topology_name_);
   request->set_topology_id(topology_id_);
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
 }
 
 void CkptMgrClient::SendCleanStatefulCheckpointRequest(const std::string& _oldest_ckpt,
                                                        bool _clean_all) {
   LOG(INFO) << "Sending CleanStatefulCheckpoint request to ckptmgr with oldest checkpoint "
             << _oldest_ckpt << " and clean_all " << _clean_all << std::endl;
-  auto request = new proto::ckptmgr::CleanStatefulCheckpointRequest();
+  auto request = make_unique<proto::ckptmgr::CleanStatefulCheckpointRequest>();
   request->set_oldest_checkpoint_preserved(_oldest_ckpt);
   request->set_clean_all_checkpoints(_clean_all);
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
 }
 
 void CkptMgrClient::HandleCleanStatefulCheckpointResponse(void*,
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
index 7cbc77d..31dcf7b 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
@@ -34,6 +34,8 @@
 namespace heron {
 namespace tmaster {
 
+using std::make_shared;
+
 const sp_string METRIC_RESTORE_START = "__restore_start";
 const sp_string METRIC_RESTORE_STMGR_RESPONSE = "__restore_stmgr_response";
 const sp_string METRIC_RESTORE_STMGR_RESPONSE_IGNORED = "__restore_stmgr_response_ignored";
@@ -49,26 +51,22 @@
 const int32_t MOST_CHECKPOINTS_NUMBER = 5;
 
 StatefulController::StatefulController(const std::string& _topology_name,
-               proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
-               heron::common::HeronStateMgr* _state_mgr,
+               shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+               shared_ptr<heron::common::HeronStateMgr> _state_mgr,
                std::chrono::high_resolution_clock::time_point _tmaster_start_time,
-               common::MetricsMgrSt* _metrics_manager_client,
+               shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
                std::function<void(std::string)> _ckpt_save_watcher)
-  : topology_name_(_topology_name), ckpt_record_(_ckpt), state_mgr_(_state_mgr),
+  : topology_name_(_topology_name), ckpt_record_(std::move(_ckpt)), state_mgr_(_state_mgr),
     metrics_manager_client_(_metrics_manager_client) {
-  checkpointer_ = new StatefulCheckpointer(_tmaster_start_time);
-  restorer_ = new StatefulRestorer();
-  count_metrics_ = new common::MultiCountMetric();
+  checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time);
+  restorer_ = make_unique<StatefulRestorer>();
+  count_metrics_ = make_shared<common::MultiCountMetric>();
   metrics_manager_client_->register_metric("__stateful_controller", count_metrics_);
   ckpt_save_watcher_ = _ckpt_save_watcher;
 }
 
 StatefulController::~StatefulController() {
-  delete ckpt_record_;
-  delete checkpointer_;
-  delete restorer_;
   metrics_manager_client_->unregister_metric("__stateful_controller");
-  delete count_metrics_;
 }
 
 void StatefulController::StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_state) {
@@ -139,32 +137,34 @@
 }
 
 void StatefulController::HandleInstanceStateStored(const std::string& _checkpoint_id,
-                                               const std::string& _packing_plan_id,
-                                               const proto::system::Instance& _instance) {
+                                                   const std::string& _packing_plan_id,
+                                                   const proto::system::Instance& _instance) {
   count_metrics_->scope(METRIC_INSTANCE_CKPT_SAVED)->incr();
+
   if (restorer_->IsInProgress()) {
     LOG(INFO) << "Ignoring the Instance State because we are in Restore";
     count_metrics_->scope(METRIC_INSTANCE_CKPT_SAVED_IGNORED)->incr();
     return;
   }
+
   if (checkpointer_->HandleInstanceStateStored(_checkpoint_id, _instance)) {
     // This is now a globally consistent checkpoint
     count_metrics_->scope(METRIC_GLOBAL_CONSISTENT_CKPT)->incr();
     auto new_ckpt_record = AddNewConsistentCheckpoint(_checkpoint_id, _packing_plan_id);
-    state_mgr_->SetStatefulCheckpoints(topology_name_, *new_ckpt_record,
-           std::bind(&StatefulController::HandleCheckpointSave, this,
-                    new_ckpt_record, std::placeholders::_1));
+
+    state_mgr_->SetStatefulCheckpoints(topology_name_, new_ckpt_record,
+            std::bind(&StatefulController::HandleCheckpointSave, this, new_ckpt_record,
+                    std::placeholders::_1));
   }
 }
 
 void StatefulController::HandleCheckpointSave(
-                                    proto::ckptmgr::StatefulConsistentCheckpoints* _new_ckpt,
-                                    proto::system::StatusCode _status) {
+        shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
+        proto::system::StatusCode _status) {
   if (_status == proto::system::OK) {
     LOG(INFO) << "Successfully saved " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
               << " as the new globally consistent checkpoint";
-    delete ckpt_record_;
-    ckpt_record_ = _new_ckpt;
+    ckpt_record_ = std::move(_new_ckpt);
     std::string oldest_ckpt =
         ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size() - 1)
           .checkpoint_id();
@@ -173,7 +173,6 @@
     LOG(ERROR) << "Error saving " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
               << " as the new globally consistent checkpoint "
               << _status;
-    delete _new_ckpt;
   }
 }
 
@@ -194,10 +193,10 @@
   return EMPTY_STRING;
 }
 
-proto::ckptmgr::StatefulConsistentCheckpoints*
+shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints>
 StatefulController::AddNewConsistentCheckpoint(const std::string& _new_checkpoint,
                                            const std::string& _packing_plan) {
-  auto new_record = new proto::ckptmgr::StatefulConsistentCheckpoints();
+  auto new_record = make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
   auto new_consistent_checkpoint = new_record->add_consistent_checkpoints();
   new_consistent_checkpoint->set_checkpoint_id(_new_checkpoint);
   new_consistent_checkpoint->set_packing_plan_id(_packing_plan);
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h b/heron/tmaster/src/cpp/manager/stateful-controller.h
index 488bbbd..6ceda37 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.h
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.h
@@ -36,6 +36,9 @@
 namespace heron {
 namespace tmaster {
 
+using std::unique_ptr;
+using std::shared_ptr;
+
 class StatefulRestorer;
 class StatefulCheckpointer;
 
@@ -51,10 +54,10 @@
 class StatefulController {
  public:
   explicit StatefulController(const std::string& _topology_name,
-               proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
-               heron::common::HeronStateMgr* _state_mgr,
+               shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+               shared_ptr<heron::common::HeronStateMgr> _state_mgr,
                std::chrono::high_resolution_clock::time_point _tmaster_start_time,
-               common::MetricsMgrSt* _metrics_manager_client,
+               shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
                std::function<void(std::string)> _ckpt_save_watcher);
   virtual ~StatefulController();
   // Start a new restore process
@@ -85,20 +88,20 @@
   // Get the youngest ckpt id that is older than the given ckpt_id
   const std::string& GetNextInLineCheckpointId(const std::string& _ckpt_id);
   // Creates a new ckpt record adding the latest one
-  proto::ckptmgr::StatefulConsistentCheckpoints*
+  shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints>
     AddNewConsistentCheckpoint(const std::string& _new_checkpoint,
                                const std::string& _packing_plan);
   // Handler when statemgr saves the new checkpoint record
-  void HandleCheckpointSave(proto::ckptmgr::StatefulConsistentCheckpoints* _new_ckpt,
+  void HandleCheckpointSave(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
                             proto::system::StatusCode _status);
 
   std::string topology_name_;
-  proto::ckptmgr::StatefulConsistentCheckpoints* ckpt_record_;
-  heron::common::HeronStateMgr* state_mgr_;
-  StatefulCheckpointer* checkpointer_;
-  StatefulRestorer* restorer_;
-  common::MetricsMgrSt* metrics_manager_client_;
-  common::MultiCountMetric* count_metrics_;
+  shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> ckpt_record_;
+  shared_ptr<heron::common::HeronStateMgr> state_mgr_;
+  unique_ptr<StatefulCheckpointer> checkpointer_;
+  unique_ptr<StatefulRestorer> restorer_;
+  shared_ptr<common::MetricsMgrSt> metrics_manager_client_;
+  shared_ptr<common::MultiCountMetric> count_metrics_;
   std::function<void(std::string)> ckpt_save_watcher_;
 };
 }  // namespace tmaster
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.cpp b/heron/tmaster/src/cpp/manager/stats-interface.cpp
index 4b2086d..e1f8aa9 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.cpp
+++ b/heron/tmaster/src/cpp/manager/stats-interface.cpp
@@ -33,9 +33,9 @@
 namespace tmaster {
 
 StatsInterface::StatsInterface(EventLoop* eventLoop, const NetworkOptions& _options,
-                               TMetricsCollector* _collector, TMaster* _tmaster)
+                               shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster)
     : metrics_collector_(_collector), tmaster_(_tmaster) {
-  http_server_ = new HTTPServer(eventLoop, _options);
+  http_server_ = make_unique<HTTPServer>(eventLoop, _options);
   // Install the handlers
   auto cbHandleStats = [this](IncomingHTTPRequest* request) { this->HandleStatsRequest(request); };
 
@@ -64,7 +64,7 @@
   CHECK(http_server_->Start() == SP_OK);
 }
 
-StatsInterface::~StatsInterface() { delete http_server_; }
+StatsInterface::~StatsInterface() {}
 
 void StatsInterface::HandleStatsRequest(IncomingHTTPRequest* _request) {
   LOG(INFO) << "Got a stats request " << _request->GetQuery();
@@ -77,16 +77,14 @@
     delete _request;
     return;
   }
-  proto::tmaster::MetricResponse* res =
-    metrics_collector_->GetMetrics(req, tmaster_->getInitialTopology());
+  auto res = metrics_collector_->GetMetrics(req, tmaster_->getInitialTopology());
   sp_string response_string;
   CHECK(res->SerializeToString(&response_string));
-  OutgoingHTTPResponse* response = new OutgoingHTTPResponse(_request);
+  auto response = make_unique<OutgoingHTTPResponse>(_request);
   response->AddHeader("Content-Type", "application/octet-stream");
   response->AddHeader("Content-Length", std::to_string(response_string.size()));
   response->AddResponse(response_string);
-  http_server_->SendReply(_request, 200, response);
-  delete res;
+  http_server_->SendReply(_request, 200, std::move(response));
   delete _request;
   LOG(INFO) << "Done with stats request ";
 }
@@ -102,16 +100,15 @@
     delete _request;
     return;
   }
-  heron::proto::tmaster::ExceptionLogResponse* exception_response =
+  unique_ptr<heron::proto::tmaster::ExceptionLogResponse> exception_response =
       metrics_collector_->GetExceptions(exception_request);
   sp_string response_string;
   CHECK(exception_response->SerializeToString(&response_string));
-  OutgoingHTTPResponse* http_response = new OutgoingHTTPResponse(_request);
+  auto http_response = make_unique<OutgoingHTTPResponse>(_request);
   http_response->AddHeader("Content-Type", "application/octet-stream");
   http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
   http_response->AddResponse(response_string);
-  http_server_->SendReply(_request, 200, http_response);
-  delete exception_response;
+  http_server_->SendReply(_request, 200, std::move(http_response));
   delete _request;
   LOG(INFO) << "Returned exceptions response";
 }
@@ -126,16 +123,14 @@
     delete _request;
     return;
   }
-  heron::proto::tmaster::ExceptionLogResponse* exception_response =
-      metrics_collector_->GetExceptionsSummary(exception_request);
+  auto exception_response = metrics_collector_->GetExceptionsSummary(exception_request);
   sp_string response_string;
   CHECK(exception_response->SerializeToString(&response_string));
-  OutgoingHTTPResponse* http_response = new OutgoingHTTPResponse(_request);
+  auto http_response = make_unique<OutgoingHTTPResponse>(_request);
   http_response->AddHeader("Content-Type", "application/octet-stream");
   http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
   http_response->AddResponse(response_string);
-  http_server_->SendReply(_request, 200, http_response);
-  delete exception_response;
+  http_server_->SendReply(_request, 200, std::move(http_response));
   delete _request;
   LOG(INFO) << "Returned exceptions response";
 }
@@ -155,11 +150,11 @@
   auto stmgrs_reg_summary_response = tmaster_->GetStmgrsRegSummary();
   sp_string response_string;
   CHECK(stmgrs_reg_summary_response->SerializeToString(&response_string));
-  auto http_response = new OutgoingHTTPResponse(_request);
+  auto http_response = make_unique<OutgoingHTTPResponse>(_request);
   http_response->AddHeader("Content-Type", "application/octet-stream");
   http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
   http_response->AddResponse(response_string);
-  http_server_->SendReply(_request, 200, http_response);
+  http_server_->SendReply(_request, 200, std::move(http_response));
   delete _request;
   LOG(INFO) << "Returned stream managers registration summary response";
 }
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.h b/heron/tmaster/src/cpp/manager/stats-interface.h
index 58916df..a6fd190 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.h
+++ b/heron/tmaster/src/cpp/manager/stats-interface.h
@@ -27,13 +27,16 @@
 namespace heron {
 namespace tmaster {
 
+using std::unique_ptr;
+using std::shared_ptr;
+
 class TMetricsCollector;
 class TMaster;
 
 class StatsInterface {
  public:
   StatsInterface(EventLoop* eventLoop, const NetworkOptions& options,
-                 TMetricsCollector* _collector, TMaster* tmaster);
+                 shared_ptr<TMetricsCollector> _collector, TMaster* tmaster);
   virtual ~StatsInterface();
 
  private:
@@ -43,8 +46,8 @@
   void HandleExceptionSummaryRequest(IncomingHTTPRequest* _request);
   void HandleStmgrsRegistrationSummaryRequest(IncomingHTTPRequest* _request);
 
-  HTTPServer* http_server_;  // Our http server
-  TMetricsCollector* metrics_collector_;
+  unique_ptr<HTTPServer> http_server_;  // Our http server
+  shared_ptr<TMetricsCollector> metrics_collector_;
   TMaster* tmaster_;
 };
 }  // namespace tmaster
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
index fca3cfe..46be97d 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
@@ -33,34 +33,25 @@
 namespace tmaster {
 
 StMgrState::StMgrState(Connection* _conn, const proto::system::StMgr& _stmgr,
-                       const std::vector<proto::system::Instance*>& _instances,
-                       Server* _server) {
+                       const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+                       Server& _server) : server_(_server) {
   last_heartbeat_ = time(NULL);
   last_stats_ = NULL;
   instances_ = _instances;
-  stmgr_ = new proto::system::StMgr(_stmgr);
+  stmgr_ = std::make_shared<proto::system::StMgr>(_stmgr);
   connection_ = _conn;
-  server_ = _server;
 }
 
 StMgrState::~StMgrState() {
-  delete stmgr_;
-  for (size_t i = 0; i < instances_.size(); ++i) {
-    delete instances_[i];
-  }
   delete last_stats_;
 }
 
 void StMgrState::UpdateWithNewStMgr(const proto::system::StMgr& _stmgr,
-                                    const std::vector<proto::system::Instance*>& _instances,
-                                    Connection* _conn) {
+                                const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+                                Connection* _conn) {
   delete last_stats_;
   last_stats_ = NULL;
-  delete stmgr_;
-  for (size_t i = 0; i < instances_.size(); ++i) {
-    delete instances_[i];
-  }
-  stmgr_ = new proto::system::StMgr(_stmgr);
+  stmgr_ = std::make_shared<proto::system::StMgr>(_stmgr);
   instances_ = _instances;
   connection_ = _conn;
 }
@@ -97,7 +88,7 @@
             const proto::ckptmgr::RestoreTopologyStateRequest& _message) {
   LOG(INFO) << "Sending restore topology state message to stmgr " << stmgr_->id()
             << " with checkpoint " << _message.checkpoint_id();
-  server_->SendMessage(connection_, _message);
+  server_.SendMessage(connection_, _message);
 }
 
 void StMgrState::SendStartStatefulProcessingMessage(const std::string& _checkpoint_id) {
@@ -105,19 +96,19 @@
             << " with checkpoint " << _checkpoint_id;
   proto::ckptmgr::StartStmgrStatefulProcessing message;
   message.set_checkpoint_id(_checkpoint_id);
-  server_->SendMessage(connection_, message);
+  server_.SendMessage(connection_, message);
 }
 
 void StMgrState::NewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) {
   LOG(INFO) << "Sending a new physical plan to stmgr " << stmgr_->id();
   proto::stmgr::NewPhysicalPlanMessage message;
   message.mutable_new_pplan()->CopyFrom(_pplan);
-  server_->SendMessage(connection_, message);
+  server_.SendMessage(connection_, message);
 }
 
 void StMgrState::NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request) {
   LOG(INFO) << "Sending a new stateful checkpoint request to stmgr" << stmgr_->id();
-  server_->SendMessage(connection_, _request);
+  server_.SendMessage(connection_, _request);
 }
 
 /*
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h b/heron/tmaster/src/cpp/manager/stmgrstate.h
index ebfdde4..270d93a 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.h
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.h
@@ -40,16 +40,18 @@
 namespace heron {
 namespace tmaster {
 
+using std::shared_ptr;
+
 class TMasterServer;
 
 class StMgrState {
  public:
   StMgrState(Connection* _conn, const proto::system::StMgr& _info,
-             const std::vector<proto::system::Instance*>& _instances, Server* _server);
+             const std::vector<shared_ptr<proto::system::Instance>>& _instances, Server& _server);
   virtual ~StMgrState();
 
   void UpdateWithNewStMgr(const proto::system::StMgr& _info,
-                          const std::vector<proto::system::Instance*>& _instances,
+                          const std::vector<shared_ptr<proto::system::Instance>>& _instances,
                           Connection* _conn);
 
   // Update the heartbeat. Note:- We own _stats now
@@ -75,8 +77,8 @@
   Connection* get_connection() { return connection_; }
   const std::string& get_id() const { return stmgr_->id(); }
   sp_uint32 get_num_instances() const { return instances_.size(); }
-  const std::vector<proto::system::Instance*>& get_instances() const { return instances_; }
-  const proto::system::StMgr* get_stmgr() const { return stmgr_; }
+  const std::vector<shared_ptr<proto::system::Instance>>& get_instances() const {return instances_;}
+  const shared_ptr<proto::system::StMgr> get_stmgr() const { return stmgr_; }
   bool VerifyInstances(const std::vector<proto::system::Instance*>& _instances);
 
  private:
@@ -86,14 +88,14 @@
   proto::system::StMgrStats* last_stats_;
 
   // All the instances on this stmgr
-  std::vector<proto::system::Instance*> instances_;
+  std::vector<shared_ptr<proto::system::Instance>> instances_;
 
   // The info about this stmgr
-  proto::system::StMgr* stmgr_;
+  shared_ptr<proto::system::StMgr> stmgr_;
   // The connection used by the nodemanager to contact us
   Connection* connection_;
   // Our link to our TMaster
-  Server* server_;
+  Server& server_;
 };
 }  // namespace tmaster
 }  // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index b7a4ee6..6745f80 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -42,7 +42,7 @@
  */
 TController::TController(EventLoop* eventLoop, const NetworkOptions& options, TMaster* tmaster)
     : tmaster_(tmaster) {
-  http_server_ = new HTTPServer(eventLoop, options);
+  http_server_ = make_unique<HTTPServer>(eventLoop, options);
   /*
    * Install the handlers
    */
@@ -74,7 +74,7 @@
   http_server_->InstallCallBack("/get_current_physical_plan", std::move(cbGetCurPPlan));
 }
 
-TController::~TController() { delete http_server_; }
+TController::~TController() {}
 
 sp_int32 TController::Start() { return http_server_->Start(); }
 
@@ -111,9 +111,9 @@
   } else {
     std::string s = "Topology successfully activated";
     LOG(INFO) << s;
-    OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+    auto response = make_unique<OutgoingHTTPResponse>(request);
     response->AddResponse(s);
-    http_server_->SendReply(request, 200, response);
+    http_server_->SendReply(request, 200, std::move(response));
   }
   delete request;
 }
@@ -150,9 +150,9 @@
   } else {
     std::string s = "Topology successfully deactivated";
     LOG(INFO) << s;
-    OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+    auto response = make_unique<OutgoingHTTPResponse>(request);
     response->AddResponse(s);
-    http_server_->SendReply(request, 200, response);
+    http_server_->SendReply(request, 200, std::move(response));
   }
   delete request;
 }
@@ -197,9 +197,9 @@
   } else {
     std::string msg = "Checkpoints successfully cleaned";
     LOG(INFO) << msg;
-    OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+    auto response = make_unique<OutgoingHTTPResponse>(request);
     response->AddResponse(msg);
-    http_server_->SendReply(request, 200, response);
+    http_server_->SendReply(request, 200, std::move(response));
   }
   delete request;
 }
@@ -262,9 +262,9 @@
   } else {
     const std::string message("Runtime config updated");
     LOG(INFO) << message;
-    OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+    auto response = make_unique<OutgoingHTTPResponse>(request);
     response->AddResponse(message);
-    http_server_->SendReply(request, 200, response);
+    http_server_->SendReply(request, 200, std::move(response));
   }
   delete request;
 }
@@ -293,9 +293,9 @@
 
     const std::string message("Get current physical plan");
     LOG(INFO) << message;
-    OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+    auto response = make_unique<OutgoingHTTPResponse>(request);
     response->AddResponse(pplanStringFixed);
-    http_server_->SendReply(request, 200, response);
+    http_server_->SendReply(request, 200, std::move(response));
   }
   delete request;
 }
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.h b/heron/tmaster/src/cpp/manager/tcontroller.h
index f7e7a48..a611b11 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.h
+++ b/heron/tmaster/src/cpp/manager/tcontroller.h
@@ -69,7 +69,7 @@
   void HandleGetCurPPlanRequest(IncomingHTTPRequest* request);
 
   // We are a http server
-  HTTPServer* http_server_;
+  unique_ptr<HTTPServer> http_server_;
 
   // our tmaster
   TMaster* tmaster_;
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index 206d8fb..9ab6897 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -47,6 +47,9 @@
 namespace heron {
 namespace tmaster {
 
+using std::unique_ptr;
+using std::make_shared;
+
 // Stats for the process
 const sp_string METRIC_CPU_USER = "__cpu_user_usec";
 const sp_string METRIC_CPU_SYSTEM = "__cpu_system_usec";
@@ -69,51 +72,49 @@
   topdir_ = _topdir;
   tmaster_controller_ = nullptr;
   tmaster_controller_port_ = _tmaster_controller_port;
-  master_ = NULL;
+  master_ = nullptr;
   master_port_ = _master_port;
-  stats_ = NULL;
+  stats_ = nullptr;
   stats_port_ = _stats_port;
   myhost_name_ = _myhost_name;
   eventLoop_ = eventLoop;
   dns_ = new AsyncDNS(eventLoop_);
   http_client_ = new HTTPClient(eventLoop_, dns_);
 
-  metrics_collector_ =
-      new TMetricsCollector(config::HeronInternalsConfigReader::Instance()
-                                    ->GetHeronTmasterMetricsCollectorMaximumIntervalMin() *
-                                60,
-                            eventLoop_, _metrics_sinks_yaml);
+  metrics_collector_ = make_shared<TMetricsCollector>(config::HeronInternalsConfigReader::Instance()
+                                         ->GetHeronTmasterMetricsCollectorMaximumIntervalMin() * 60,
+                                           eventLoop_, _metrics_sinks_yaml);
 
   mMetricsMgrPort = metricsMgrPort;
 
   sp_int32 metricsExportIntervalSec =
       config::HeronInternalsConfigReader::Instance()->GetHeronMetricsExportIntervalSec();
 
-  mMetricsMgrClient = new heron::common::MetricsMgrSt(
+  mMetricsMgrClient = make_shared<heron::common::MetricsMgrSt>(
       mMetricsMgrPort, metricsExportIntervalSec, eventLoop_);
   mMetricsMgrClient->Start(myhost_name_, master_port_, "__tmaster__",
                            "0");  // MM expects task_id, so just giving 0 for tmaster.
 
-  tmasterProcessMetrics = new heron::common::MultiAssignableMetric();
+  tmasterProcessMetrics = make_shared<heron::common::MultiAssignableMetric>();
   mMetricsMgrClient->register_metric(METRIC_PREFIX, tmasterProcessMetrics);
 
   ckptmgr_port_ = _ckptmgr_port;
   ckptmgr_client_ = nullptr;
 
-  current_pplan_ = NULL;
+  current_pplan_ = nullptr;
 
   // The topology as first submitted by the user
   // It shall only be used to construct the physical plan when TMaster first time starts
   // Any runtime changes shall be made to current_pplan_->topology
-  topology_ = NULL;
-  packing_plan_ = NULL;
+  topology_ = nullptr;
+  packing_plan_ = nullptr;
   state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zk_hostport_, topdir_, eventLoop_);
 
   assignment_in_progress_ = false;
   do_reassign_ = false;
 
   master_establish_attempts_ = 0;
-  tmaster_location_ = new proto::tmaster::TMasterLocation();
+  tmaster_location_ = make_unique<proto::tmaster::TMasterLocation>();
   tmaster_location_->set_topology_name(_topology_name);
   tmaster_location_->set_topology_id(_topology_id);
   tmaster_location_->set_host(myhost_name_);
@@ -154,7 +155,7 @@
 }
 
 void TMaster::FetchPackingPlan() {
-  auto packing_plan = new proto::system::PackingPlan();
+  auto packing_plan = make_shared<proto::system::PackingPlan>();
 
   state_mgr_->GetPackingPlan(tmaster_location_->topology_name(), packing_plan,
                              [packing_plan, this](proto::system::StatusCode status) {
@@ -162,7 +163,7 @@
   });
 }
 
-void TMaster::OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
+void TMaster::OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
                                  proto::system::StatusCode _status) {
   if (_status != proto::system::OK) {
     LOG(INFO) << "PackingPlan Fetch failed with status " << _status;
@@ -230,28 +231,18 @@
 }
 
 TMaster::~TMaster() {
-  delete topology_;
-  delete packing_plan_;
-  delete current_pplan_;
-  delete state_mgr_;
-  delete tmaster_controller_;
   if (master_) {
     master_->Stop();
   }
-  delete master_;
-  delete stats_;
-  delete tmaster_location_;
+
   for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
-    delete iter->second;
+    stmgrs_.erase(iter->first);
   }
+
   stmgrs_.clear();
-  delete metrics_collector_;
 
   mMetricsMgrClient->unregister_metric(METRIC_PREFIX);
-  delete mMetricsMgrClient;
-  delete tmasterProcessMetrics;
-  delete stateful_controller_;
-  delete ckptmgr_client_;
+
   delete http_client_;
   delete dns_;
 }
@@ -302,8 +293,8 @@
   LOG(INFO) << "Successfully set ourselves as master\n";
 
   // Lets now read the topology
-  topology_ = new proto::api::Topology();
-  state_mgr_->GetTopology(tmaster_location_->topology_name(), topology_,
+  topology_ = make_unique<proto::api::Topology>();
+  state_mgr_->GetTopology(tmaster_location_->topology_name(), *topology_,
                           [this](proto::system::StatusCode code) {
     this->GetTopologyDone(code);
   });
@@ -340,7 +331,7 @@
     ckpt_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
                                            ->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
                                        1024 * 1024);
-    ckptmgr_client_ = new CkptMgrClient(eventLoop_, ckpt_options,
+    ckptmgr_client_ = make_unique<CkptMgrClient>(eventLoop_, ckpt_options,
                                         topology_->name(), topology_->id(),
                                         std::bind(&TMaster::HandleCleanStatefulCheckpointResponse,
                                         this, std::placeholders::_1));
@@ -348,7 +339,7 @@
     ckptmgr_client_->Start();
 
     // We also need to load latest checkpoint state
-    auto ckpt = new proto::ckptmgr::StatefulConsistentCheckpoints();
+    auto ckpt = std::make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
     auto cb = [ckpt, this](proto::system::StatusCode code) {
       this->GetStatefulCheckpointsDone(ckpt, code);
     };
@@ -360,8 +351,9 @@
   }
 }
 
-void TMaster::GetStatefulCheckpointsDone(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
-                                         proto::system::StatusCode _code) {
+void TMaster::GetStatefulCheckpointsDone(
+        shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+        proto::system::StatusCode _code) {
   if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
     LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
                << " Getting Stateful Checkpoint failed with error " << _code;
@@ -370,9 +362,8 @@
     LOG(INFO) << "For topology " << tmaster_location_->topology_name()
               << " No existing globally consistent checkpoint found "
               << " inserting a empty one";
-    delete _ckpt;
     // We need to set an empty one
-    auto ckpts = new proto::ckptmgr::StatefulConsistentCheckpoints;
+    auto ckpts = make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
     auto ckpt = ckpts->add_consistent_checkpoints();
     ckpt->set_checkpoint_id("");
     ckpt->set_packing_plan_id(packing_plan_->id());
@@ -381,18 +372,18 @@
     };
 
     state_mgr_->CreateStatefulCheckpoints(tmaster_location_->topology_name(),
-                                          *ckpts, std::move(cb));
+                                          ckpts, std::move(cb));
   } else {
     LOG(INFO) << "For topology " << tmaster_location_->topology_name()
               << " An existing globally consistent checkpoint found "
               << _ckpt->DebugString();
-    SetupStatefulController(_ckpt);
+    SetupStatefulController(std::move(_ckpt));
     FetchPhysicalPlan();
   }
 }
 
 void TMaster::SetStatefulCheckpointsDone(proto::system::StatusCode _code,
-                             proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt) {
+                                 shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
   if (_code != proto::system::OK) {
     LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
                << " Setting empty Stateful Checkpoint failed with error " << _code;
@@ -401,7 +392,8 @@
   FetchPhysicalPlan();
 }
 
-void TMaster::SetupStatefulController(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt) {
+void TMaster::SetupStatefulController(
+        shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
   sp_int64 stateful_checkpoint_interval =
        config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300);
   CHECK_GT(stateful_checkpoint_interval, 0);
@@ -410,8 +402,8 @@
     this->HandleStatefulCheckpointSave(_oldest_ckptid);
   };
   // Instantiate the stateful restorer/coordinator
-  stateful_controller_ = new StatefulController(topology_->name(), _ckpt, state_mgr_, start_time_,
-                                        mMetricsMgrClient, cb);
+  stateful_controller_ = make_unique<StatefulController>(topology_->name(), _ckpt, state_mgr_,
+                                                                start_time_, mMetricsMgrClient, cb);
   LOG(INFO) << "Starting timer to checkpoint state every "
             << stateful_checkpoint_interval << " seconds";
   CHECK_GT(eventLoop_->registerTimer(
@@ -449,7 +441,7 @@
 }
 
 void TMaster::FetchPhysicalPlan() {
-  proto::system::PhysicalPlan* pplan = new proto::system::PhysicalPlan();
+  auto pplan = make_shared<proto::system::PhysicalPlan>();
   auto cb = [pplan, this](proto::system::StatusCode code) {
     this->GetPhysicalPlanDone(pplan, code);
   };
@@ -489,7 +481,7 @@
   }
 }
 
-void TMaster::GetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan,
+void TMaster::GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
                                   proto::system::StatusCode _code) {
   // Physical plan need not exist. First check if some other error occurred.
   if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
@@ -504,7 +496,6 @@
   if (_code == proto::system::PATH_DOES_NOT_EXIST) {
     LOG(ERROR) << "There was no existing physical plan\n";
     // We never did assignment in the first place
-    delete _pplan;
   } else {
     LOG(INFO) << "There was an existing physical plan\n";
     CHECK_EQ(_code, proto::system::OK);
@@ -523,7 +514,7 @@
                                          ->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
                                      1_MB);
   master_options.set_socket_family(PF_INET);
-  master_ = new TMasterServer(eventLoop_, master_options, metrics_collector_, this);
+  master_ = make_unique<TMasterServer>(eventLoop_, master_options, metrics_collector_, this);
 
   sp_int32 retval = master_->Start();
   if (retval != SP_OK) {
@@ -539,7 +530,7 @@
           ->GetHeronTmasterNetworkControllerOptionsMaximumPacketMb() *
       1_MB);
   controller_options.set_socket_family(PF_INET);
-  tmaster_controller_ = new TController(eventLoop_, controller_options, this);
+  tmaster_controller_ = make_unique<TController>(eventLoop_, controller_options, this);
 
   retval = tmaster_controller_->Start();
   if (retval != SP_OK) {
@@ -559,7 +550,7 @@
                                         ->GetHeronTmasterNetworkStatsOptionsMaximumPacketMb() *
                                     1_MB);
   stats_options.set_socket_family(PF_INET);
-  stats_ = new StatsInterface(eventLoop_, stats_options, metrics_collector_, this);
+  stats_ = make_unique<StatsInterface>(eventLoop_, stats_options, metrics_collector_, this);
 }
 
 void TMaster::ActivateTopology(VCallback<proto::system::StatusCode> cb) {
@@ -567,7 +558,7 @@
   DCHECK(current_pplan_->topology().IsInitialized());
 
   // Set the status
-  proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+  auto new_pplan = make_shared<proto::system::PhysicalPlan>();
   new_pplan->CopyFrom(*current_pplan_);
   new_pplan->mutable_topology()->set_state(proto::api::RUNNING);
 
@@ -584,7 +575,7 @@
   DCHECK(current_pplan_->topology().IsInitialized());
 
   // Set the status
-  proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+  auto new_pplan = make_shared<proto::system::PhysicalPlan>();
   new_pplan->CopyFrom(*current_pplan_);
   new_pplan->mutable_topology()->set_state(proto::api::PAUSED);
 
@@ -604,12 +595,11 @@
   LogConfig(_config);
 
   // Parse and set the new configs
-  proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+  auto new_pplan = make_shared<proto::system::PhysicalPlan>();
   new_pplan->CopyFrom(*current_pplan_);
   proto::api::Topology* topology = new_pplan->mutable_topology();
   if (!UpdateRuntimeConfigInTopology(topology, _config)) {
     LOG(ERROR) << "Fail to update runtime config in topology";
-    delete new_pplan;
     return false;
   }
 
@@ -686,8 +676,9 @@
 }
 
 proto::system::Status* TMaster::RegisterStMgr(
-    const proto::system::StMgr& _stmgr, const std::vector<proto::system::Instance*>& _instances,
-    Connection* _conn, proto::system::PhysicalPlan*& _pplan) {
+                               const proto::system::StMgr& _stmgr,
+                               const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+                               Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan) {
   const std::string& stmgr_id = _stmgr.id();
   LOG(INFO) << "Got a register stream manager request from " << stmgr_id;
 
@@ -733,7 +724,7 @@
     return status;
   } else {
     // This guy was indeed expected
-    stmgrs_[stmgr_id] = new StMgrState(_conn, _stmgr, _instances, master_);
+    stmgrs_[stmgr_id] = make_shared<StMgrState>(_conn, _stmgr, _instances, *master_);
     connection_to_stmgr_id_[_conn] = stmgr_id;
     absent_stmgrs_.erase(stmgr_id);
   }
@@ -749,6 +740,7 @@
       CHECK_GE(eventLoop_->registerTimer(std::move(cb), false, 0), 0);
     }
   }
+
   _pplan = current_pplan_;
   proto::system::Status* status = new proto::system::Status();
   status->set_status(proto::system::OK);
@@ -769,7 +761,7 @@
 
   // TODO(kramasamy): If current_assignment exists, we need
   // to use as many portions from it as possible
-  proto::system::PhysicalPlan* pplan = MakePhysicalPlan();
+  shared_ptr<proto::system::PhysicalPlan> pplan = MakePhysicalPlan();
   CHECK_NOTNULL(pplan);
   DCHECK(pplan->IsInitialized());
 
@@ -795,7 +787,7 @@
   }
 }
 
-void TMaster::SetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan,
+void TMaster::SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
                                   proto::system::StatusCode _code) {
   if (_code != proto::system::OK) {
     LOG(ERROR) << "Error writing assignment to statemgr. Error code is " << _code << std::endl;
@@ -806,14 +798,12 @@
 
   if (do_reassign_) {
     // Some other mapping change happened
-    delete _pplan;
     assignment_in_progress_ = true;
     LOG(INFO) << "Doing assignment since physical assignment might have changed" << std::endl;
     auto cb = [this](EventLoop::Status status) { this->DoPhysicalPlan(status); };
     CHECK_GE(eventLoop_->registerTimer(std::move(cb), false, 0), 0);
   } else {
     bool first_time_pplan = current_pplan_ == nullptr;
-    delete current_pplan_;
     current_pplan_ = _pplan;
     assignment_in_progress_ = false;
     // We need to pass that on to all streammanagers
@@ -862,7 +852,7 @@
   return std::move(response);
 }
 
-proto::system::PhysicalPlan* TMaster::MakePhysicalPlan() {
+shared_ptr<proto::system::PhysicalPlan> TMaster::MakePhysicalPlan() {
   // TODO(kramasamy): At some point, we need to talk to our scheduler
   // and do this scheduling
   if (current_pplan_) {
@@ -872,7 +862,7 @@
     // First lets verify that our original pplan and instances
     // all match up
     CHECK(ValidateStMgrsWithPhysicalPlan(current_pplan_));
-    proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+    auto new_pplan = make_shared<proto::system::PhysicalPlan>();
     new_pplan->mutable_topology()->CopyFrom(current_pplan_->topology());
 
     for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
@@ -881,6 +871,7 @@
     for (sp_int32 i = 0; i < current_pplan_->instances_size(); ++i) {
       new_pplan->add_instances()->CopyFrom(current_pplan_->instances(i));
     }
+
     return new_pplan;
   }
 
@@ -889,13 +880,14 @@
   // TMaster just stitches the info together to pass to everyone
 
   // Build the PhysicalPlan structure
-  proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+  auto new_pplan = make_shared<proto::system::PhysicalPlan>();
   new_pplan->mutable_topology()->CopyFrom(*topology_);
 
   // Build the physical assignments
   for (StMgrMapIter stmgr_iter = stmgrs_.begin(); stmgr_iter != stmgrs_.end(); ++stmgr_iter) {
     new_pplan->add_stmgrs()->CopyFrom(*(stmgr_iter->second->get_stmgr()));
-    const std::vector<proto::system::Instance*>& instances = stmgr_iter->second->get_instances();
+    const std::vector<shared_ptr<proto::system::Instance>>& instances =
+            stmgr_iter->second->get_instances();
     for (size_t i = 0; i < instances.size(); ++i) {
       new_pplan->add_instances()->CopyFrom(*(instances[i]));
     }
@@ -937,13 +929,12 @@
   if (stmgrs_.find(stmgr_id) == stmgrs_.end()) {
     return proto::system::INVALID_STMGR;
   }
-  StMgrState* stmgr = stmgrs_[stmgr_id];
+  auto stmgr = stmgrs_[stmgr_id];
   // This guy disconnected from us
   LOG(INFO) << "StMgr " << stmgr->get_stmgr()->id() << " disconnected from us" << std::endl;
   stmgrs_.erase(stmgr->get_id());
   connection_to_stmgr_id_.erase(_conn);
   absent_stmgrs_.insert(stmgr->get_id());
-  delete stmgr;
   return proto::system::OK;
 }
 
@@ -1000,7 +991,7 @@
   return ninstances == ntasks;
 }
 
-bool TMaster::ValidateStMgrsWithPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
+bool TMaster::ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan) {
   std::map<std::string, std::vector<proto::system::Instance*> > stmgr_to_instance_map;
   for (sp_int32 i = 0; i < _pplan->instances_size(); ++i) {
     proto::system::Instance* instance = _pplan->mutable_instances(i);
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index 8869618..e407677 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -35,6 +35,9 @@
 namespace heron {
 namespace tmaster {
 
+using std::unique_ptr;
+using std::shared_ptr;
+
 class StMgrState;
 class TController;
 class StatsInterface;
@@ -43,7 +46,7 @@
 class StatefulController;
 class CkptMgrClient;
 
-typedef std::map<std::string, StMgrState*> StMgrMap;
+typedef std::map<std::string, shared_ptr<StMgrState>> StMgrMap;
 typedef StMgrMap::iterator StMgrMapIter;
 
 typedef std::map<std::string, std::string> ConfigValueMap;
@@ -75,8 +78,8 @@
   bool ValidateRuntimeConfig(const ComponentConfigMap& _config) const;
 
   proto::system::Status* RegisterStMgr(const proto::system::StMgr& _stmgr,
-                                       const std::vector<proto::system::Instance*>& _instances,
-                                       Connection* _conn, proto::system::PhysicalPlan*& _pplan);
+                                const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+                                Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan);
   // function to update heartbeat for a nodemgr
   proto::system::Status* UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
                                               proto::system::StMgrStats* _stats);
@@ -93,12 +96,12 @@
   std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse> GetStmgrsRegSummary();
 
   // Accessors
-  const proto::system::PhysicalPlan* getPhysicalPlan() const { return current_pplan_; }
+  const shared_ptr<proto::system::PhysicalPlan> getPhysicalPlan() const { return current_pplan_; }
   // TODO(mfu): Should we provide this?
   // topology_ should only be used to construct physical plan when TMaster first starts
   // Providing an accessor is bug prone.
   // Now used in GetMetrics function in tmetrics-collector
-  const proto::api::Topology* getInitialTopology() const { return topology_; }
+  const proto::api::Topology& getInitialTopology() const { return *topology_; }
 
   // Timer function to start the stateful checkpoint process
   void SendCheckpointMarker();
@@ -131,7 +134,7 @@
   // If _new_stmgr is null, this means that there was a plan
   // existing, but a _new_stmgr joined us. So redo his part
   // If _new_stmgr is empty, this means do pplan from scratch
-  proto::system::PhysicalPlan* MakePhysicalPlan();
+  shared_ptr<proto::system::PhysicalPlan> MakePhysicalPlan();
 
   // Check to see if the topology is of correct format
   bool ValidateTopology(const proto::api::Topology& _topology);
@@ -142,7 +145,7 @@
 
   // Check to see if the stmgrs and pplan match
   // in terms of workers
-  bool ValidateStMgrsWithPhysicalPlan(proto::system::PhysicalPlan* _pplan);
+  bool ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan);
 
   // Check if incoming runtime configs are valid or not.
   // All incoming configurations must exist. If there is any non-existing
@@ -159,26 +162,28 @@
   void GetTopologyDone(proto::system::StatusCode _code);
 
   // Function called after we get StatefulConsistentCheckpoints
-  void GetStatefulCheckpointsDone(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
+  void GetStatefulCheckpointsDone(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
                                   proto::system::StatusCode _code);
   // Function called after we set an initial StatefulConsistentCheckpoints
   void SetStatefulCheckpointsDone(proto::system::StatusCode _code,
-                            proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt);
+                                  shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt);
   // Helper function to setup stateful coordinator
-  void SetupStatefulController(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt);
+  void SetupStatefulController(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt);
 
   // Function called after we try to get assignment
-  void GetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan, proto::system::StatusCode _code);
+  void GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
+          proto::system::StatusCode _code);
 
   // Function called after we try to commit a new assignment
-  void SetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan, proto::system::StatusCode _code);
+  void SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
+                           proto::system::StatusCode _code);
 
   // Function called when we want to setup ourselves as tmaster
   void EstablishTMaster(EventLoop::Status);
 
   void EstablishPackingPlan(EventLoop::Status);
   void FetchPackingPlan();
-  void OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
+  void OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
                           proto::system::StatusCode _status);
 
   // Metrics updates
@@ -207,20 +212,20 @@
   std::set<std::string> absent_stmgrs_;
 
   // The current physical plan
-  proto::system::PhysicalPlan* current_pplan_;
+  shared_ptr<proto::system::PhysicalPlan> current_pplan_;
 
   // The topology as first submitted by the user
   // It shall only be used to construct the physical plan when TMaster first time starts
   // Any runtime changes shall be made to current_pplan_->topology
-  proto::api::Topology* topology_;
+  unique_ptr<proto::api::Topology> topology_;
 
-  proto::system::PackingPlan* packing_plan_;
+  shared_ptr<proto::system::PackingPlan> packing_plan_;
 
   // The statemgr where we store/retrieve our state
-  heron::common::HeronStateMgr* state_mgr_;
+  shared_ptr<heron::common::HeronStateMgr> state_mgr_;
 
   // Our copy of the tmasterlocation
-  proto::tmaster::TMasterLocation* tmaster_location_;
+  unique_ptr<proto::tmaster::TMasterLocation> tmaster_location_;
 
   // When we are in the middle of doing assignment
   // we set this to true
@@ -232,11 +237,11 @@
   std::string topdir_;
 
   // Servers that implement our services
-  TController* tmaster_controller_;
+  unique_ptr<TController> tmaster_controller_;
   sp_int32 tmaster_controller_port_;
-  TMasterServer* master_;
+  unique_ptr<TMasterServer> master_;
   sp_int32 master_port_;
-  StatsInterface* stats_;
+  unique_ptr<StatsInterface> stats_;
   sp_int32 stats_port_;
   std::string myhost_name_;
 
@@ -245,24 +250,24 @@
   sp_int32 master_establish_attempts_;
 
   // collector
-  TMetricsCollector* metrics_collector_;
+  shared_ptr<TMetricsCollector> metrics_collector_;
 
   sp_int32 mMetricsMgrPort;
   // Metrics Manager
-  heron::common::MetricsMgrSt* mMetricsMgrClient;
+  shared_ptr<heron::common::MetricsMgrSt> mMetricsMgrClient;
 
   // Ckpt Manager
-  CkptMgrClient* ckptmgr_client_;
+  unique_ptr<CkptMgrClient> ckptmgr_client_;
   sp_int32 ckptmgr_port_;
 
   // Process related metrics
-  heron::common::MultiAssignableMetric* tmasterProcessMetrics;
+  shared_ptr<heron::common::MultiAssignableMetric> tmasterProcessMetrics;
 
   // The time at which the stmgr was started up
   std::chrono::high_resolution_clock::time_point start_time_;
 
   // Stateful Controller
-  StatefulController* stateful_controller_;
+  unique_ptr<StatefulController> stateful_controller_;
 
   // HTTP client
   AsyncDNS* dns_;
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.cpp b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
index be977d2..ea8dac8 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.cpp
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
@@ -32,8 +32,11 @@
 namespace heron {
 namespace tmaster {
 
+using std::unique_ptr;
+using std::shared_ptr;
+
 TMasterServer::TMasterServer(EventLoop* eventLoop, const NetworkOptions& _options,
-                             TMetricsCollector* _collector, TMaster* _tmaster)
+                             shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster)
     : Server(eventLoop, _options), collector_(_collector), tmaster_(_tmaster) {
   // Install the stmgr handlers
   InstallRequestHandler(&TMasterServer::HandleStMgrRegisterRequest);
@@ -65,15 +68,15 @@
 
 void TMasterServer::HandleStMgrRegisterRequest(REQID _reqid, Connection* _conn,
                                                proto::tmaster::StMgrRegisterRequest* _request) {
-  StMgrRegisterProcessor* processor =
-      new StMgrRegisterProcessor(_reqid, _conn, _request, tmaster_, this);
+  unique_ptr<StMgrRegisterProcessor> processor =
+      make_unique<StMgrRegisterProcessor>(_reqid, _conn, _request, tmaster_, this);
   processor->Start();
 }
 
 void TMasterServer::HandleStMgrHeartbeatRequest(REQID _reqid, Connection* _conn,
                                                 proto::tmaster::StMgrHeartbeatRequest* _request) {
-  StMgrHeartbeatProcessor* processor =
-      new StMgrHeartbeatProcessor(_reqid, _conn, _request, tmaster_, this);
+  unique_ptr<StMgrHeartbeatProcessor> processor =
+      make_unique<StMgrHeartbeatProcessor>(_reqid, _conn, _request, tmaster_, this);
   processor->Start();
 }
 
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.h b/heron/tmaster/src/cpp/manager/tmasterserver.h
index 297cc82..70306cf 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.h
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.h
@@ -29,13 +29,15 @@
 namespace heron {
 namespace tmaster {
 
+using std::shared_ptr;
+
 class TMaster;
 class TMetricsCollector;
 
 class TMasterServer : public Server {
  public:
-  TMasterServer(EventLoop* eventLoop, const NetworkOptions& options, TMetricsCollector* _collector,
-                TMaster* _tmaster);
+  TMasterServer(EventLoop* eventLoop, const NetworkOptions& options,
+          shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster);
   virtual ~TMasterServer();
 
  protected:
@@ -62,7 +64,7 @@
                                      proto::ckptmgr::ResetTopologyState* _message);
 
   // our tmaster
-  TMetricsCollector* collector_;
+  shared_ptr<TMetricsCollector> collector_;
   TMaster* tmaster_;
 };
 }  // namespace tmaster
diff --git a/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp b/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
index f8ac311..41e4a06 100644
--- a/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
+++ b/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
@@ -53,7 +53,7 @@
     : max_interval_(_max_interval),
       eventLoop_(eventLoop),
       metrics_sinks_yaml_(metrics_sinks_yaml),
-      tmetrics_info_(new common::TMasterMetrics(metrics_sinks_yaml, eventLoop)),
+      tmetrics_info_(make_unique<common::TMasterMetrics>(metrics_sinks_yaml, eventLoop)),
       start_time_(time(NULL)) {
   interval_ = config::HeronInternalsConfigReader::Instance()
                   ->GetHeronTmasterMetricsCollectorPurgeIntervalSec();
@@ -63,12 +63,7 @@
   CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, interval_ * 1000000), 0);
 }
 
-TMetricsCollector::~TMetricsCollector() {
-  for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
-    delete iter->second;
-  }
-  delete tmetrics_info_;
-}
+TMetricsCollector::~TMetricsCollector() {}
 
 void TMetricsCollector::Purge(EventLoop::Status) {
   for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
@@ -81,7 +76,7 @@
 
 void TMetricsCollector::AddMetricsForComponent(const sp_string& component_name,
                                                const proto::tmaster::MetricDatum& metrics_data) {
-  ComponentMetrics* component_metrics = GetOrCreateComponentMetrics(component_name);
+  auto component_metrics = GetOrCreateComponentMetrics(component_name);
   const sp_string& name = metrics_data.name();
   const TMasterMetrics::MetricAggregationType& type = tmetrics_info_->GetAggregationType(name);
   component_metrics->AddMetricForInstance(metrics_data.instance_id(), name, type,
@@ -90,7 +85,7 @@
 
 void TMetricsCollector::AddExceptionsForComponent(const sp_string& component_name,
                                                   const TmasterExceptionLog& exception_log) {
-  ComponentMetrics* component_metrics = GetOrCreateComponentMetrics(component_name);
+  auto component_metrics = GetOrCreateComponentMetrics(component_name);
   component_metrics->AddExceptionForInstance(exception_log.instance_id(), exception_log);
 }
 
@@ -105,21 +100,21 @@
   }
 }
 
-MetricResponse* TMetricsCollector::GetMetrics(const MetricRequest& _request,
-                                              const proto::api::Topology* _topology) {
-  auto response = new MetricResponse();
+unique_ptr<MetricResponse> TMetricsCollector::GetMetrics(const MetricRequest& _request,
+                                              const proto::api::Topology& _topology) {
+  auto response = make_unique<MetricResponse>();
 
   if (metrics_.find(_request.component_name()) == metrics_.end()) {
     bool component_exists = false;
-    for (int i = 0; i < _topology->spouts_size(); i++) {
-      if ((_topology->spouts(i)).comp().name() == _request.component_name()) {
+    for (int i = 0; i < _topology.spouts_size(); i++) {
+      if ((_topology.spouts(i)).comp().name() == _request.component_name()) {
         component_exists = true;
         break;
       }
     }
     if (!component_exists) {
-      for (int i = 0; i < _topology->bolts_size(); i++) {
-        if ((_topology->bolts(i)).comp().name() == _request.component_name()) {
+      for (int i = 0; i < _topology.bolts_size(); i++) {
+        if ((_topology.bolts(i)).comp().name() == _request.component_name()) {
           component_exists = true;
           break;
         }
@@ -156,15 +151,15 @@
       start_time = _request.explicit_interval().start();
       end_time = _request.explicit_interval().end();
     }
-    metrics_[_request.component_name()]->GetMetrics(_request, start_time, end_time, response);
+    metrics_[_request.component_name()]->GetMetrics(_request, start_time, end_time, *response);
     response->set_interval(end_time - start_time);
   }
   return response;
 }
 
 void TMetricsCollector::GetExceptionsHelper(const ExceptionLogRequest& request,
-                                            ExceptionLogResponse* exceptions) {
-  ComponentMetrics* component_metrics = metrics_[request.component_name()];
+                                            ExceptionLogResponse& exceptions) {
+  auto component_metrics = metrics_[request.component_name()];
   if (request.instances_size() == 0) {
     component_metrics->GetAllExceptions(exceptions);
   } else {
@@ -174,8 +169,9 @@
   }
 }
 
-ExceptionLogResponse* TMetricsCollector::GetExceptions(const ExceptionLogRequest& request) {
-  auto response = new ExceptionLogResponse();
+unique_ptr<ExceptionLogResponse> TMetricsCollector::GetExceptions(
+                                                            const ExceptionLogRequest& request) {
+  auto response = make_unique<ExceptionLogResponse>();
   if (metrics_.find(request.component_name()) == metrics_.end()) {
     LOG(ERROR) << "GetExceptions request received for unknown component "
                << request.component_name();
@@ -185,12 +181,13 @@
   }
   response->mutable_status()->set_status(proto::system::OK);
   response->mutable_status()->set_message("OK");
-  GetExceptionsHelper(request, response);
+  GetExceptionsHelper(request, *response);
   return response;
 }
 
-ExceptionLogResponse* TMetricsCollector::GetExceptionsSummary(const ExceptionLogRequest& request) {
-  auto response = new ExceptionLogResponse();
+unique_ptr<ExceptionLogResponse> TMetricsCollector::GetExceptionsSummary(
+                                                              const ExceptionLogRequest& request) {
+  auto response = make_unique<ExceptionLogResponse>();
 
   if (metrics_.find(request.component_name()) == metrics_.end()) {
     LOG(ERROR) << "GetExceptionSummary request received for unknown component "
@@ -203,10 +200,9 @@
   response->mutable_status()->set_message("OK");
 
   // Owns this pointer.
-  auto all_exceptions = new ExceptionLogResponse();
-  GetExceptionsHelper(request, all_exceptions);  // Store un aggregated exceptions.
-  AggregateExceptions(*all_exceptions, response);
-  delete all_exceptions;
+  auto all_exceptions = make_unique<ExceptionLogResponse>();
+  GetExceptionsHelper(request, *all_exceptions);  // Store un aggregated exceptions.
+  AggregateExceptions(*all_exceptions, *response);
 
   return response;
 }
@@ -216,8 +212,11 @@
 // into one function which take aggregation as argument. Modify the ExceptionRequest to
 // take argument for which aggregation function to use)
 void TMetricsCollector::AggregateExceptions(const ExceptionLogResponse& all_exceptions,
-                                            ExceptionLogResponse* aggregate_exceptions) {
-  std::map<std::string, TmasterExceptionLog*> exception_summary;  // Owns exception log pointer.
+                                            ExceptionLogResponse& aggregate_exceptions) {
+  using std::map;
+  using std::string;
+
+  map<string, unique_ptr<TmasterExceptionLog>> exception_summary;  // Owns exception log pointer.
   for (int i = 0; i < all_exceptions.exceptions_size(); ++i) {
     const TmasterExceptionLog& log = all_exceptions.exceptions(i);
     // Get classname by splitting on first colon
@@ -226,30 +225,31 @@
     if (pos != std::string::npos) {
       const std::string class_name = stack_trace.substr(0, pos);
       if (exception_summary.find(class_name) == exception_summary.end()) {
-        auto new_exception = new TmasterExceptionLog();
+        auto new_exception = make_unique<TmasterExceptionLog>();
         new_exception->CopyFrom(log);
         new_exception->set_stacktrace(class_name);
-        exception_summary[class_name] = new_exception;
+        exception_summary[class_name] = std::move(new_exception);
       } else {
-        TmasterExceptionLog* prev_log = exception_summary[class_name];
-        prev_log->set_count(log.count() + prev_log->count());
-        prev_log->set_lasttime(log.lasttime());
+        TmasterExceptionLog& prev_log = *exception_summary[class_name];
+        prev_log.set_count(log.count() + prev_log.count());
+        prev_log.set_lasttime(log.lasttime());
       }
     }
   }
 
   for (auto summary_iter = exception_summary.begin();
        summary_iter != exception_summary.end(); ++summary_iter) {
-    aggregate_exceptions->add_exceptions()->CopyFrom(*(summary_iter->second));
-    delete summary_iter->second;  // Remove the temporary object holding exception summary
+    aggregate_exceptions.add_exceptions()->CopyFrom(*(summary_iter->second));
   }
 }
 
-TMetricsCollector::ComponentMetrics* TMetricsCollector::GetOrCreateComponentMetrics(
+shared_ptr<TMetricsCollector::ComponentMetrics> TMetricsCollector::GetOrCreateComponentMetrics(
     const sp_string& component_name) {
   if (metrics_.find(component_name) == metrics_.end()) {
-    metrics_[component_name] = new ComponentMetrics(component_name, nintervals_, interval_);
+    metrics_[component_name] =
+            std::make_shared<ComponentMetrics>(component_name, nintervals_, interval_);
   }
+
   return metrics_[component_name];
 }
 
@@ -257,11 +257,7 @@
                                                       sp_int32 nbuckets, sp_int32 bucket_interval)
     : component_name_(component_name), nbuckets_(nbuckets), bucket_interval_(bucket_interval) {}
 
-TMetricsCollector::ComponentMetrics::~ComponentMetrics() {
-  for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
-    delete iter->second;
-  }
-}
+TMetricsCollector::ComponentMetrics::~ComponentMetrics() {}
 
 void TMetricsCollector::ComponentMetrics::Purge() {
   for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
@@ -272,32 +268,34 @@
 void TMetricsCollector::ComponentMetrics::AddMetricForInstance(
     const sp_string& instance_id, const sp_string& name, TMasterMetrics::MetricAggregationType type,
     const sp_string& value) {
-  InstanceMetrics* instance_metrics = GetOrCreateInstanceMetrics(instance_id);
+  auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
   instance_metrics->AddMetricWithName(name, type, value);
 }
 
 void TMetricsCollector::ComponentMetrics::AddExceptionForInstance(
     const sp_string& instance_id, const TmasterExceptionLog& exception) {
-  InstanceMetrics* instance_metrics = GetOrCreateInstanceMetrics(instance_id);
+  auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
   instance_metrics->AddExceptions(exception);
 }
 
-TMetricsCollector::InstanceMetrics* TMetricsCollector::ComponentMetrics::GetOrCreateInstanceMetrics(
-    const sp_string& instance_id) {
+shared_ptr<TMetricsCollector::InstanceMetrics>
+    TMetricsCollector::ComponentMetrics::GetOrCreateInstanceMetrics(const sp_string& instance_id) {
   if (metrics_.find(instance_id) == metrics_.end()) {
-    metrics_[instance_id] = new InstanceMetrics(instance_id, nbuckets_, bucket_interval_);
+    metrics_[instance_id] =
+            std::make_shared<InstanceMetrics>(instance_id, nbuckets_, bucket_interval_);
   }
+
   return metrics_[instance_id];
 }
 
 void TMetricsCollector::ComponentMetrics::GetMetrics(const MetricRequest& _request,
                                                      sp_int64 start_time, sp_int64 end_time,
-                                                     MetricResponse* _response) {
+                                                     MetricResponse& _response) {
   if (_request.instance_id_size() == 0) {
     // This means that all instances need to be returned
     for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
       iter->second->GetMetrics(_request, start_time, end_time, _response);
-      if (_response->status().status() != proto::system::OK) {
+      if (_response.status().status() != proto::system::OK) {
         return;
       }
     }
@@ -306,27 +304,27 @@
       const sp_string& id = _request.instance_id(i);
       if (metrics_.find(id) == metrics_.end()) {
         LOG(ERROR) << "GetMetrics request received for unknown instance_id " << id;
-        _response->mutable_status()->set_status(proto::system::NOTOK);
+        _response.mutable_status()->set_status(proto::system::NOTOK);
         return;
       } else {
         metrics_[id]->GetMetrics(_request, start_time, end_time, _response);
-        if (_response->status().status() != proto::system::OK) {
+        if (_response.status().status() != proto::system::OK) {
           return;
         }
       }
     }
   }
-  _response->mutable_status()->set_status(proto::system::OK);
+  _response.mutable_status()->set_status(proto::system::OK);
 }
 
 void TMetricsCollector::ComponentMetrics::GetExceptionsForInstance(const sp_string& instance_id,
-                                                                   ExceptionLogResponse* response) {
+                                                                   ExceptionLogResponse& response) {
   if (metrics_.find(instance_id) != metrics_.end()) {
     metrics_[instance_id]->GetExceptionLog(response);
   }
 }
 
-void TMetricsCollector::ComponentMetrics::GetAllExceptions(ExceptionLogResponse* response) {
+void TMetricsCollector::ComponentMetrics::GetAllExceptions(ExceptionLogResponse& response) {
   for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
     iter->second->GetExceptionLog(response);
   }
@@ -336,14 +334,7 @@
                                                     sp_int32 bucket_interval)
     : instance_id_(instance_id), nbuckets_(nbuckets), bucket_interval_(bucket_interval) {}
 
-TMetricsCollector::InstanceMetrics::~InstanceMetrics() {
-  for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
-    delete iter->second;
-  }
-  for (auto ex_iter = exceptions_.begin(); ex_iter != exceptions_.end(); ++ex_iter) {
-    delete *ex_iter;
-  }
-}
+TMetricsCollector::InstanceMetrics::~InstanceMetrics() {}
 
 void TMetricsCollector::InstanceMetrics::Purge() {
   for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
@@ -354,7 +345,7 @@
 void TMetricsCollector::InstanceMetrics::AddMetricWithName(
     const sp_string& name, common::TMasterMetrics::MetricAggregationType type,
     const sp_string& value) {
-  Metric* metric_data = GetOrCreateMetric(name, type);
+  auto metric_data = GetOrCreateMetric(name, type);
   metric_data->AddValueToMetric(value);
 }
 
@@ -363,30 +354,28 @@
   // TODO(kramasamy): Aggregate exceptions across minutely buckets. Try to avoid duplication of
   // hash-fuction
   // used to aggregate in heron-worker.
-  auto new_exception = new TmasterExceptionLog();
+  auto new_exception = make_unique<TmasterExceptionLog>();
   new_exception->CopyFrom(exception);
-  exceptions_.push_back(new_exception);
+  exceptions_.push_back(std::move(new_exception));
   sp_uint32 max_exception = config::HeronInternalsConfigReader::Instance()
                                 ->GetHeronTmasterMetricsCollectorMaximumException();
   while (exceptions_.size() > max_exception) {
-    TmasterExceptionLog* e = exceptions_.front();
     exceptions_.pop_front();
-    delete e;
   }
 }
 
-TMetricsCollector::Metric* TMetricsCollector::InstanceMetrics::GetOrCreateMetric(
+shared_ptr<TMetricsCollector::Metric> TMetricsCollector::InstanceMetrics::GetOrCreateMetric(
     const sp_string& name, TMasterMetrics::MetricAggregationType type) {
   if (metrics_.find(name) == metrics_.end()) {
-    metrics_[name] = new Metric(name, type, nbuckets_, bucket_interval_);
+    metrics_[name] = std::make_shared<Metric>(name, type, nbuckets_, bucket_interval_);
   }
   return metrics_[name];
 }
 
 void TMetricsCollector::InstanceMetrics::GetMetrics(const MetricRequest& request,
                                                     sp_int64 start_time, sp_int64 end_time,
-                                                    MetricResponse* response) {
-  MetricResponse::TaskMetric* m = response->add_metric();
+                                                    MetricResponse& response) {
+  MetricResponse::TaskMetric* m = response.add_metric();
   m->set_instance_id(instance_id_);
   for (sp_int32 i = 0; i < request.metric_size(); ++i) {
     const sp_string& id = request.metric(i);
@@ -396,9 +385,9 @@
   }
 }
 
-void TMetricsCollector::InstanceMetrics::GetExceptionLog(ExceptionLogResponse* response) {
+void TMetricsCollector::InstanceMetrics::GetExceptionLog(ExceptionLogResponse& response) {
   for (auto ex_iter = exceptions_.begin(); ex_iter != exceptions_.end(); ++ex_iter) {
-    response->add_exceptions()->CopyFrom(*(*ex_iter));
+    response.add_exceptions()->CopyFrom(*(*ex_iter));
   }
 }
 
@@ -411,21 +400,15 @@
       all_time_nitems_(0),
       bucket_interval_(bucket_interval) {
   for (sp_int32 i = 0; i < nbuckets; ++i) {
-    data_.push_back(new TimeBucket(bucket_interval_));
+    data_.push_back(std::move(make_unique<TimeBucket>(bucket_interval_)));
   }
 }
 
-TMetricsCollector::Metric::~Metric() {
-  for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
-    delete *iter;
-  }
-}
+TMetricsCollector::Metric::~Metric() {}
 
 void TMetricsCollector::Metric::Purge() {
-  TimeBucket* b = data_.back();
   data_.pop_back();
-  data_.push_front(new TimeBucket(bucket_interval_));
-  delete b;
+  data_.push_front(std::move(make_unique<TimeBucket>(bucket_interval_)));
 }
 
 void TMetricsCollector::Metric::AddValueToMetric(const sp_string& _value) {
@@ -449,17 +432,17 @@
   if (minutely) {
     // we need minutely data
     for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
-      TimeBucket* bucket = *iter;
+      TimeBucket& bucket = **iter;
       // Does this time bucket have overlap with needed range
-      if (bucket->overlaps(start_time, end_time)) {
+      if (bucket.overlaps(start_time, end_time)) {
         IntervalValue* val = _response->add_interval_values();
-        val->mutable_interval()->set_start(bucket->start_time_);
-        val->mutable_interval()->set_end(bucket->end_time_);
-        sp_double64 result = bucket->aggregate();
+        val->mutable_interval()->set_start(bucket.start_time_);
+        val->mutable_interval()->set_end(bucket.end_time_);
+        sp_double64 result = bucket.aggregate();
         if (metric_type_ == common::TMasterMetrics::SUM) {
           val->set_value(std::to_string(result));
         } else if (metric_type_ == common::TMasterMetrics::AVG) {
-          sp_double64 avg = result / bucket->count();
+          sp_double64 avg = result / bucket.count();
           val->set_value(std::to_string(avg));
         } else if (metric_type_ == common::TMasterMetrics::LAST) {
           val->set_value(std::to_string(result));
@@ -468,7 +451,7 @@
         }
       }
       // The timebuckets are reverse chronologically arranged
-      if (start_time > bucket->end_time_) break;
+      if (start_time > bucket.end_time_) break;
     }
   } else {
     // We don't need minutely data
@@ -489,15 +472,15 @@
       sp_int64 total_items = 0;
       sp_double64 total_count = 0;
       for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
-        TimeBucket* bucket = *iter;
+        TimeBucket& bucket = **iter;
         // Does this time bucket have overlap with needed range
-        if (bucket->overlaps(start_time, end_time)) {
-          total_count += bucket->aggregate();
-          total_items += bucket->count();
+        if (bucket.overlaps(start_time, end_time)) {
+          total_count += bucket.aggregate();
+          total_items += bucket.count();
           if (metric_type_ == TMasterMetrics::LAST) break;
         }
         // The timebuckets are reverse chronologically arranged
-        if (start_time > bucket->end_time_) break;
+        if (start_time > bucket.end_time_) break;
       }
       if (metric_type_ == TMasterMetrics::SUM) {
         result = total_count;
diff --git a/heron/tmaster/src/cpp/manager/tmetrics-collector.h b/heron/tmaster/src/cpp/manager/tmetrics-collector.h
index b898b94..8374590 100644
--- a/heron/tmaster/src/cpp/manager/tmetrics-collector.h
+++ b/heron/tmaster/src/cpp/manager/tmetrics-collector.h
@@ -32,6 +32,10 @@
 
 namespace heron {
 namespace tmaster {
+
+using std::unique_ptr;
+using std::shared_ptr;
+
 // Helper class to manage aggregation and and serving of metrics. Metrics are logically stored as a
 // component_name -> {instance_id ->value}n .
 // TODO(kramasamy): Store metrics persistently to prevent against crashes.
@@ -50,15 +54,16 @@
 
   // Returns a new response to fetch metrics. The request gets propagated to Component's and
   // Instance's get metrics. Doesn't own Response.
-  proto::tmaster::MetricResponse* GetMetrics(const proto::tmaster::MetricRequest& _request,
-                                             const proto::api::Topology* _topology);
+  unique_ptr<proto::tmaster::MetricResponse> GetMetrics(
+                                                const proto::tmaster::MetricRequest& _request,
+                                                const proto::api::Topology& _topology);
 
   // Returns response for fetching exceptions. Doesn't own response.
-  proto::tmaster::ExceptionLogResponse* GetExceptions(
+  unique_ptr<proto::tmaster::ExceptionLogResponse> GetExceptions(
       const proto::tmaster::ExceptionLogRequest& request);
 
   // Returns exception summary response. Doesn't own response.
-  proto::tmaster::ExceptionLogResponse* GetExceptionsSummary(
+  unique_ptr<proto::tmaster::ExceptionLogResponse> GetExceptionsSummary(
       const proto::tmaster::ExceptionLogRequest& request);
 
  private:
@@ -66,12 +71,12 @@
   // 'all_exceptions'.
   //  Doesn't own 'all_exceptions' pointer
   void GetExceptionsHelper(const proto::tmaster::ExceptionLogRequest& request,
-                           proto::tmaster::ExceptionLogResponse* all_excepions);
+                           proto::tmaster::ExceptionLogResponse& all_excepions);
 
   // Aggregate exceptions from 'all_exceptions' to 'aggregate_exceptions'.
   // Doesn't own 'aggregate_exceptions'.
   void AggregateExceptions(const proto::tmaster::ExceptionLogResponse& all_exceptions,
-                           proto::tmaster::ExceptionLogResponse* aggregate_exceptions);
+                           proto::tmaster::ExceptionLogResponse& aggregate_exceptions);
 
   // Add metrics for 'component_name'
   void AddMetricsForComponent(const sp_string& component_name,
@@ -139,7 +144,7 @@
    private:
     sp_string name_;
     // Time series. data_ will be ordered by their time of arrival.
-    std::list<TimeBucket*> data_;
+    std::list<unique_ptr<TimeBucket>> data_;
     // Type of metric. This can be SUM or AVG. It specify how to aggregate these metrics for
     // display.
     common::TMasterMetrics::MetricAggregationType metric_type_;
@@ -174,23 +179,23 @@
 
     // Returns the metric metrics. Doesn't own _response.
     void GetMetrics(const proto::tmaster::MetricRequest& request, sp_int64 start_time,
-                    sp_int64 end_time, proto::tmaster::MetricResponse* response);
+                    sp_int64 end_time, proto::tmaster::MetricResponse& response);
 
     // Fills response for fetching exceptions. Doesn't own response.
-    void GetExceptionLog(proto::tmaster::ExceptionLogResponse* response);
+    void GetExceptionLog(proto::tmaster::ExceptionLogResponse& response);
 
    private:
     // Create or return existing Metric. Retains ownership of Metric object returned.
-    Metric* GetOrCreateMetric(const sp_string& name,
+    shared_ptr<Metric> GetOrCreateMetric(const sp_string& name,
                               common::TMasterMetrics::MetricAggregationType type);
 
     sp_string instance_id_;
     sp_int32 nbuckets_;
     sp_int32 bucket_interval_;
     // map between metric name and its values
-    std::map<sp_string, Metric*> metrics_;
+    std::map<sp_string, shared_ptr<Metric>> metrics_;
     // list of exceptions
-    std::list<proto::tmaster::TmasterExceptionLog*> exceptions_;
+    std::list<unique_ptr<proto::tmaster::TmasterExceptionLog>> exceptions_;
   };
 
   // Component level metrics. A component metrics is a map storing metrics for each of its
@@ -217,39 +222,39 @@
     // Request aggregated metrics for this component for the '_nbucket' interval.
     // Doesn't own '_response' object.
     void GetMetrics(const proto::tmaster::MetricRequest& request, sp_int64 start_time,
-                    sp_int64 end_time, proto::tmaster::MetricResponse* response);
+                    sp_int64 end_time, proto::tmaster::MetricResponse& response);
 
     // Returns response for fetching exceptions. Doesn't own response.
     void GetExceptionsForInstance(const sp_string& instance_id,
-                                  proto::tmaster::ExceptionLogResponse* response);
+                                  proto::tmaster::ExceptionLogResponse& response);
 
-    void GetAllExceptions(proto::tmaster::ExceptionLogResponse* response);
+    void GetAllExceptions(proto::tmaster::ExceptionLogResponse& response);
 
    private:
     // Create or return existing mutable InstanceMetrics associated with 'instance_id'. This
     // method doesn't verify if the instance_id is valid fof the component.
     // Doesn't transfer ownership of returned InstanceMetrics.
-    InstanceMetrics* GetOrCreateInstanceMetrics(const sp_string& instance_id);
+    shared_ptr<InstanceMetrics> GetOrCreateInstanceMetrics(const sp_string& instance_id);
 
     sp_string component_name_;
     sp_int32 nbuckets_;
     sp_int32 bucket_interval_;
     // map between instance id and its set of metrics
-    std::map<sp_string, InstanceMetrics*> metrics_;
+    std::map<sp_string, shared_ptr<InstanceMetrics>> metrics_;
   };
 
   // Create or return existing mutable ComponentMetrics associated with 'component_name'.
   // Doesn't transfer ownership of returned ComponentMetrics
-  ComponentMetrics* GetOrCreateComponentMetrics(const sp_string& component_name);
+  shared_ptr<ComponentMetrics> GetOrCreateComponentMetrics(const sp_string& component_name);
 
   // map of component name to its metrics
-  std::map<sp_string, ComponentMetrics*> metrics_;
+  std::map<sp_string, shared_ptr<ComponentMetrics>> metrics_;
   sp_int32 max_interval_;
   sp_int32 nintervals_;
   sp_int32 interval_;
   EventLoop* eventLoop_;
   std::string metrics_sinks_yaml_;
-  common::TMasterMetrics* tmetrics_info_;
+  std::unique_ptr<common::TMasterMetrics> tmetrics_info_;
   time_t start_time_;
 };
 }  // namespace tmaster
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
index 54807bc..47bd7b1 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
@@ -49,7 +49,6 @@
   proto::tmaster::StMgrHeartbeatResponse response;
   response.set_allocated_status(status);
   SendResponse(response);
-  delete this;
 }
 }  // namespace tmaster
 }  // namespace heron
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
index e963ca5..cbdd583 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
@@ -45,13 +45,14 @@
   // Get the relevant info and ask tmaster to register
   proto::tmaster::StMgrRegisterRequest* request =
       static_cast<proto::tmaster::StMgrRegisterRequest*>(request_);
-  std::vector<proto::system::Instance*> instances;
+  std::vector<shared_ptr<proto::system::Instance>> instances;
   for (sp_int32 i = 0; i < request->instances_size(); ++i) {
-    proto::system::Instance* instance = new proto::system::Instance();
+    auto instance = std::make_shared<proto::system::Instance>();
     instance->CopyFrom(request->instances(i));
     instances.push_back(instance);
   }
-  proto::system::PhysicalPlan* pplan = NULL;
+
+  shared_ptr<proto::system::PhysicalPlan> pplan;
 
   proto::system::Status* status =
       tmaster_->RegisterStMgr(request->stmgr(), instances, GetConnection(), pplan);
@@ -65,7 +66,6 @@
     }
   }
   SendResponse(response);
-  delete this;
   return;
 }
 }  // namespace tmaster
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.cpp b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
index af7de12..c8dead6 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.cpp
+++ b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
@@ -42,9 +42,9 @@
       pplan_(nullptr),
       got_restore_message_(false),
       got_start_message_(false) {
-  InstallResponseHandler(new proto::tmaster::StMgrRegisterRequest(),
+  InstallResponseHandler(std::move(make_unique<proto::tmaster::StMgrRegisterRequest>()),
                          &DummyStMgr::HandleRegisterResponse);
-  InstallResponseHandler(new proto::tmaster::StMgrHeartbeatRequest(),
+  InstallResponseHandler(std::move(make_unique<proto::tmaster::StMgrHeartbeatRequest>()),
                          &DummyStMgr::HandleHeartbeatResponse);
   InstallMessageHandler(&DummyStMgr::HandleNewAssignmentMessage);
   InstallMessageHandler(&DummyStMgr::HandleRestoreTopologyStateRequest);
@@ -139,7 +139,7 @@
 }
 
 void DummyStMgr::SendRegisterRequest() {
-  proto::tmaster::StMgrRegisterRequest* request = new proto::tmaster::StMgrRegisterRequest();
+  auto request = make_unique<proto::tmaster::StMgrRegisterRequest>();
   proto::system::StMgr* stmgr = request->mutable_stmgr();
   stmgr->set_id(my_id_);
   stmgr->set_host_name(my_host_);
@@ -149,15 +149,15 @@
        iter != instances_.end(); ++iter) {
     request->add_instances()->CopyFrom(**iter);
   }
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
   return;
 }
 
 void DummyStMgr::SendHeartbeatRequest() {
-  proto::tmaster::StMgrHeartbeatRequest* request = new proto::tmaster::StMgrHeartbeatRequest();
+  auto request = make_unique<proto::tmaster::StMgrHeartbeatRequest>();
   request->set_heartbeat_time(time(NULL));
   request->mutable_stats();
-  SendRequest(request, NULL);
+  SendRequest(std::move(request), NULL);
   return;
 }
 
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.cpp b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
index 08d0456..a244b32 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.cpp
+++ b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
@@ -49,9 +49,9 @@
 
 void DummyTMaster::HandleRegisterRequest(REQID _id, Connection* _conn,
                                          proto::tmaster::StMgrRegisterRequest* _request) {
-  std::vector<proto::system::Instance*> instances;
+  std::vector<std::shared_ptr<proto::system::Instance>> instances;
   stmgrs_[_request->stmgr().id()] =
-    new tmaster::StMgrState(_conn, _request->stmgr(), instances, this);
+          std::make_shared<tmaster::StMgrState>(_conn, _request->stmgr(), instances, *this);
   delete _request;
   proto::tmaster::StMgrRegisterResponse response;
   response.mutable_status()->set_status(proto::system::OK);
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index 32550a0..775cc36 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -780,7 +780,7 @@
     EXPECT_EQ(updated_bolt_config[bolt_init_config + ":runtime"], "4");
   }
   std::map<std::string, std::string> updated_config, updated_spout_config, updated_bolt_config;
-  const heron::proto::system::PhysicalPlan* pplan = common.tmaster_->getPhysicalPlan();
+  const auto pplan = common.tmaster_->getPhysicalPlan();
   heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(pplan->topology(), updated_config);
   EXPECT_EQ(updated_config[topology_init_config_1 + ":runtime"], "1");
   EXPECT_EQ(updated_config[topology_init_config_2 + ":runtime"], "2");