When create a smart pointer we now allocate the most hot/frequent messages in the memory pool instead of the heap (#3316)

diff --git a/heron/common/src/cpp/basics/basics.h b/heron/common/src/cpp/basics/basics.h
index 766a64f..dc5be56 100644
--- a/heron/common/src/cpp/basics/basics.h
+++ b/heron/common/src/cpp/basics/basics.h
@@ -46,6 +46,9 @@
 #include "basics/sptest.h"
 #include "basics/mempool.h"
 
+template <typename T>
+using pool_unique_ptr = std::unique_ptr<T, std::function<void(google::protobuf::Message*)>>;
+
 // The standard std::make_unique(...) was introduced starting from C++ 14. Heron uses C++ 11.
 // Unfortunatelly we can not bump a compiler version used in Heron since we are tight coupled with
 // CentOS 7 whih comes with very old version of GCC (4.8.5). That's why we introduced
@@ -55,4 +58,20 @@
     return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
 }
 
+
+// This is a special version to get unique pointer to protobuf message.
+// It doesn't allocate anythong on heap. Instead it just acquires the existing (preallocated)
+// event from the memory pool. Resulted unique pointer also has a deleter callback which
+// returns the event back to the memory pool when that unique pointer destructed.
+template<typename T>
+pool_unique_ptr<T> make_unique_from_protobuf_pool() {
+    T* event = nullptr;
+    event = __global_protobuf_pool_acquire__(event);
+
+    return pool_unique_ptr<T>(event, [](google::protobuf::Message* ptr) {
+        T* event_to_dispose = static_cast<T*>(ptr);
+        __global_protobuf_pool_release__(event_to_dispose);
+    });
+}
+
 #endif  // HERON_BASICS_H_
diff --git a/heron/common/src/cpp/metrics/metricsmgr-client.cpp b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
index 297f932..bcd9687 100644
--- a/heron/common/src/cpp/metrics/metricsmgr-client.cpp
+++ b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
@@ -84,7 +84,7 @@
 }
 
 void MetricsMgrClient::HandleRegisterResponse(
-    void*, unique_ptr<proto::system::MetricPublisherRegisterResponse> _response,
+    void*, pool_unique_ptr<proto::system::MetricPublisherRegisterResponse> _response,
     NetworkErrorCode _status) {
   if (_status == OK && _response->status().status() != proto::system::OK) {
     // What the heck we explicitly got a non ok response
diff --git a/heron/common/src/cpp/metrics/metricsmgr-client.h b/heron/common/src/cpp/metrics/metricsmgr-client.h
index 4bf9844..28cd422 100644
--- a/heron/common/src/cpp/metrics/metricsmgr-client.h
+++ b/heron/common/src/cpp/metrics/metricsmgr-client.h
@@ -58,8 +58,8 @@
   void ReConnect();
   void SendRegisterRequest();
   void HandleRegisterResponse(void* _ctx,
-                              unique_ptr<proto::system::MetricPublisherRegisterResponse> _respose,
-                              NetworkErrorCode _status);
+                          pool_unique_ptr<proto::system::MetricPublisherRegisterResponse> _respose,
+                          NetworkErrorCode _status);
 
   sp_string hostname_;
   sp_int32 port_;
diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h
index 976edeb..a9709b7 100644
--- a/heron/common/src/cpp/network/client.h
+++ b/heron/common/src/cpp/network/client.h
@@ -132,7 +132,7 @@
   // Register a handler for a particular response type
   template <typename S, typename T, typename M>
   void InstallResponseHandler(unique_ptr<S> _request,
-                              void (T::*method)(void* _ctx, unique_ptr<M>,
+                              void (T::*method)(void* _ctx, pool_unique_ptr<M>,
                               NetworkErrorCode status)) {
     auto m = make_unique<M>();
     T* t = static_cast<T*>(this);
@@ -143,7 +143,7 @@
 
   // Register a handler for a particular message type
   template <typename T, typename M>
-  void InstallMessageHandler(void (T::*method)(unique_ptr<M> _message)) {
+  void InstallMessageHandler(void (T::*method)(pool_unique_ptr<M> _message)) {
     google::protobuf::Message* m = new M();
     T* t = static_cast<T*>(this);
     messageHandlers[m->GetTypeName()] =
@@ -212,10 +212,10 @@
   void OnPacketTimer(REQID _id, EventLoop::Status status);
 
   template <typename T, typename M>
-  void dispatchResponse(T* _t, void (T::*method)(void* _ctx, unique_ptr<M>, NetworkErrorCode),
+  void dispatchResponse(T* _t, void (T::*method)(void* _ctx, pool_unique_ptr<M>, NetworkErrorCode),
                         IncomingPacket* _ipkt, NetworkErrorCode _code) {
     void* ctx = nullptr;
-    unique_ptr<M> m = nullptr;
+    pool_unique_ptr<M> m = nullptr;
     NetworkErrorCode status = _code;
     if (status == OK && _ipkt) {
       REQID rid;
@@ -223,7 +223,7 @@
       if (context_map_.find(rid) != context_map_.end()) {
         // indeed
         ctx = context_map_[rid].second;
-        m = make_unique<M>();
+        m = make_unique_from_protobuf_pool<M>();
         context_map_.erase(rid);
         _ipkt->UnPackProtocolBuffer(m.get());
       } else {
@@ -241,8 +241,8 @@
   }
 
   template <typename T, typename M>
-  void dispatchMessage(T* _t, void (T::*method)(unique_ptr<M>), IncomingPacket* _ipkt) {
-    unique_ptr<M> m = make_unique<M>();
+  void dispatchMessage(T* _t, void (T::*method)(pool_unique_ptr<M>), IncomingPacket* _ipkt) {
+    pool_unique_ptr<M> m = make_unique_from_protobuf_pool<M>();
 
     if (_ipkt->UnPackProtocolBuffer(m.get()) != 0) {
       // We could not decode the pb properly
diff --git a/heron/common/src/cpp/network/server.h b/heron/common/src/cpp/network/server.h
index 096263d..cf578a4 100644
--- a/heron/common/src/cpp/network/server.h
+++ b/heron/common/src/cpp/network/server.h
@@ -151,7 +151,7 @@
 
   // Register a handler for a particular request type
   template <typename T, typename M>
-  void InstallRequestHandler(void (T::*method)(REQID id, Connection* conn, unique_ptr<M>)) {
+  void InstallRequestHandler(void (T::*method)(REQID id, Connection* conn, pool_unique_ptr<M>)) {
     unique_ptr<google::protobuf::Message> m = make_unique<M>();
     T* t = static_cast<T*>(this);
     requestHandlers[m->GetTypeName()] = std::bind(&Server::dispatchRequest<T, M>, this, t, method,
@@ -160,7 +160,7 @@
 
   // Register a handler for a particular message type
   template <typename T, typename M>
-  void InstallMessageHandler(void (T::*method)(Connection* conn, unique_ptr<M>)) {
+  void InstallMessageHandler(void (T::*method)(Connection* conn, pool_unique_ptr<M>)) {
     unique_ptr<google::protobuf::Message> m = make_unique<M>();
     T* t = static_cast<T*>(this);
     messageHandlers[m->GetTypeName()] = std::bind(&Server::dispatchMessage<T, M>, this, t, method,
@@ -214,13 +214,13 @@
   void InternalSendResponse(Connection* _connection, OutgoingPacket* _packet);
 
   template <typename T, typename M>
-  void dispatchRequest(T* _t, void (T::*method)(REQID id, Connection* conn, unique_ptr<M>),
+  void dispatchRequest(T* _t, void (T::*method)(REQID id, Connection* conn, pool_unique_ptr<M>),
                        Connection* _conn,
                        IncomingPacket* _ipkt) {
     REQID rid;
     CHECK(_ipkt->UnPackREQID(&rid) == 0) << "REQID unpacking failed";
 
-    auto m = make_unique<M>();
+    auto m = make_unique_from_protobuf_pool<M>();
 
     if (_ipkt->UnPackProtocolBuffer(m.get()) != 0) {
       // We could not decode the pb properly
@@ -237,13 +237,13 @@
   }
 
   template <typename T, typename M>
-  void dispatchMessage(T* _t, void (T::*method)(Connection* conn, unique_ptr<M>),
+  void dispatchMessage(T* _t, void (T::*method)(Connection* conn, pool_unique_ptr<M>),
                        Connection* _conn,
                        IncomingPacket* _ipkt) {
     REQID rid;
     CHECK(_ipkt->UnPackREQID(&rid) == 0) << "REQID unpacking failed";
 
-    auto m = make_unique<M>();
+    auto m = make_unique_from_protobuf_pool<M>();
 
     if (_ipkt->UnPackProtocolBuffer(m.get()) != 0) {
       // We could not decode the pb properly
diff --git a/heron/common/tests/cpp/network/client_unittest.cpp b/heron/common/tests/cpp/network/client_unittest.cpp
index 4cfa101..b129fe0 100644
--- a/heron/common/tests/cpp/network/client_unittest.cpp
+++ b/heron/common/tests/cpp/network/client_unittest.cpp
@@ -62,7 +62,7 @@
 
 void TestClient::HandleClose(NetworkErrorCode) {}
 
-void TestClient::HandleTestMessage(unique_ptr<TestMessage> _message) {
+void TestClient::HandleTestMessage(pool_unique_ptr<TestMessage> _message) {
   ++nrecv_;
 
   if (nrecv_ >= ntotal_) {
diff --git a/heron/common/tests/cpp/network/client_unittest.h b/heron/common/tests/cpp/network/client_unittest.h
index 30d8fd6..666085e 100644
--- a/heron/common/tests/cpp/network/client_unittest.h
+++ b/heron/common/tests/cpp/network/client_unittest.h
@@ -44,7 +44,7 @@
 
  private:
   // Handle incoming message
-  void HandleTestMessage(unique_ptr<TestMessage> _message);
+  void HandleTestMessage(pool_unique_ptr<TestMessage> _message);
 
   void SendMessages();
   void CreateAndSendMessage();
diff --git a/heron/common/tests/cpp/network/oclient_unittest.cpp b/heron/common/tests/cpp/network/oclient_unittest.cpp
index 8d6b3a5..4d56203 100644
--- a/heron/common/tests/cpp/network/oclient_unittest.cpp
+++ b/heron/common/tests/cpp/network/oclient_unittest.cpp
@@ -60,7 +60,7 @@
 
 void OrderClient::HandleClose(NetworkErrorCode) {}
 
-void OrderClient::HandleOrderMessage(unique_ptr<OrderMessage> _message) {
+void OrderClient::HandleOrderMessage(pool_unique_ptr<OrderMessage> _message) {
   ++nrecv_;
 
   EXPECT_EQ(msgidr_++, _message->id());
diff --git a/heron/common/tests/cpp/network/oclient_unittest.h b/heron/common/tests/cpp/network/oclient_unittest.h
index fa68bfc..d75e870 100644
--- a/heron/common/tests/cpp/network/oclient_unittest.h
+++ b/heron/common/tests/cpp/network/oclient_unittest.h
@@ -43,7 +43,7 @@
 
  private:
   // Handle incoming message
-  void HandleOrderMessage(unique_ptr<OrderMessage> _message);
+  void HandleOrderMessage(pool_unique_ptr<OrderMessage> _message);
 
   void SendMessages();
   void CreateAndSendMessage();
diff --git a/heron/common/tests/cpp/network/oserver_unittest.cpp b/heron/common/tests/cpp/network/oserver_unittest.cpp
index 52ddcb1..bac150e 100644
--- a/heron/common/tests/cpp/network/oserver_unittest.cpp
+++ b/heron/common/tests/cpp/network/oserver_unittest.cpp
@@ -56,7 +56,7 @@
   delete ids;
 }
 
-void OrderServer::HandleOrderMessage(Connection* _conn, unique_ptr<OrderMessage> _message) {
+void OrderServer::HandleOrderMessage(Connection* _conn, pool_unique_ptr<OrderMessage> _message) {
   if (clients_.find(_conn) == clients_.end()) return;
 
   nrecv_++;
@@ -77,6 +77,6 @@
 }
 
 void OrderServer::HandleTerminateMessage(Connection* _connection __attribute__((unused)),
-                                   unique_ptr<TerminateMessage> _message __attribute__((unused))) {
+                               pool_unique_ptr<TerminateMessage> _message __attribute__((unused))) {
   AddTimer([this]() { std::cout << "OrderServer:Terminate"; this->Terminate(); }, 1);
 }
diff --git a/heron/common/tests/cpp/network/oserver_unittest.h b/heron/common/tests/cpp/network/oserver_unittest.h
index 1607cfe..ab34440 100644
--- a/heron/common/tests/cpp/network/oserver_unittest.h
+++ b/heron/common/tests/cpp/network/oserver_unittest.h
@@ -46,10 +46,11 @@
   virtual void HandleConnectionClose(Connection* connection, NetworkErrorCode status);
 
   // handle the test message
-  virtual void HandleOrderMessage(Connection* connection, unique_ptr<OrderMessage> message);
+  virtual void HandleOrderMessage(Connection* connection, pool_unique_ptr<OrderMessage> message);
 
   // handle the terminate message
-  virtual void HandleTerminateMessage(Connection* connection, unique_ptr<TerminateMessage> message);
+  virtual void HandleTerminateMessage(Connection* connection,
+          pool_unique_ptr<TerminateMessage> message);
 
  private:
   void Terminate();
diff --git a/heron/common/tests/cpp/network/server_unittest.cpp b/heron/common/tests/cpp/network/server_unittest.cpp
index e150943..01b570b 100644
--- a/heron/common/tests/cpp/network/server_unittest.cpp
+++ b/heron/common/tests/cpp/network/server_unittest.cpp
@@ -55,7 +55,7 @@
 }
 
 void TestServer::HandleTestMessage(Connection* _connection __attribute__((unused)),
-                                   unique_ptr<TestMessage> _message) {
+                                   pool_unique_ptr<TestMessage> _message) {
   nrecv_++;
 
   // find a random client to send the message to
@@ -73,6 +73,6 @@
 }
 
 void TestServer::HandleTerminateMessage(Connection* _connection __attribute__((unused)),
-                                    unique_ptr<TerminateMessage> _message __attribute__((unused))) {
+                              pool_unique_ptr<TerminateMessage> _message __attribute__((unused))) {
   AddTimer([this]() { this->Terminate(); }, 1);
 }
diff --git a/heron/common/tests/cpp/network/server_unittest.h b/heron/common/tests/cpp/network/server_unittest.h
index a629292..f8d4586 100644
--- a/heron/common/tests/cpp/network/server_unittest.h
+++ b/heron/common/tests/cpp/network/server_unittest.h
@@ -47,10 +47,11 @@
   virtual void HandleConnectionClose(Connection* connection, NetworkErrorCode status);
 
   // handle the test message
-  virtual void HandleTestMessage(Connection* connection, unique_ptr<TestMessage> message);
+  virtual void HandleTestMessage(Connection* connection, pool_unique_ptr<TestMessage> message);
 
   // handle the terminate message
-  virtual void HandleTerminateMessage(Connection* connection, unique_ptr<TerminateMessage> message);
+  virtual void HandleTerminateMessage(Connection* connection,
+                                      pool_unique_ptr<TerminateMessage> message);
 
  private:
   void Terminate();
diff --git a/heron/instance/src/cpp/boltimpl/bolt-instance.cpp b/heron/instance/src/cpp/boltimpl/bolt-instance.cpp
index b0c96c2..37cba54 100644
--- a/heron/instance/src/cpp/boltimpl/bolt-instance.cpp
+++ b/heron/instance/src/cpp/boltimpl/bolt-instance.cpp
@@ -37,7 +37,7 @@
 
 BoltInstance::BoltInstance(std::shared_ptr<EventLoop> eventLoop,
     std::shared_ptr<TaskContextImpl> taskContext,
-    NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
+    NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
     NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
     void* dllHandle)
   : taskContext_(taskContext), dataToSlave_(dataToSlave),
@@ -114,7 +114,7 @@
   metrics_->executeTuple(stream.id(), stream.component_name(), endTime - startTime);
 }
 
-void BoltInstance::HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
+void BoltInstance::HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
   if (tupleSet->has_control()) {
     LOG(FATAL) << "Bolt cannot get incoming control tuples from other components";
   }
diff --git a/heron/instance/src/cpp/boltimpl/bolt-instance.h b/heron/instance/src/cpp/boltimpl/bolt-instance.h
index 77f406c..31b949d 100644
--- a/heron/instance/src/cpp/boltimpl/bolt-instance.h
+++ b/heron/instance/src/cpp/boltimpl/bolt-instance.h
@@ -41,7 +41,7 @@
 class BoltInstance : public InstanceBase {
  public:
   BoltInstance(std::shared_ptr<EventLoop> eventLoop, std::shared_ptr<TaskContextImpl> taskContext,
-               NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
+               NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
                NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
                void* dllHandle);
   virtual ~BoltInstance();
@@ -52,7 +52,7 @@
   virtual void Deactivate();
   virtual bool IsRunning() { return active_; }
   virtual void DoWork();
-  virtual void HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet);
+  virtual void HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet);
 
  private:
   void executeTuple(const proto::api::StreamId& stream,
@@ -62,7 +62,7 @@
                     const proto::system::HeronDataTuple& tup);
 
   std::shared_ptr<TaskContextImpl> taskContext_;
-  NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave_;
+  NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave_;
   NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
   std::shared_ptr<EventLoop> eventLoop_;
   api::bolt::IBolt* bolt_;
diff --git a/heron/instance/src/cpp/gateway/gateway.cpp b/heron/instance/src/cpp/gateway/gateway.cpp
index 8c58c11..883cb6b 100644
--- a/heron/instance/src/cpp/gateway/gateway.cpp
+++ b/heron/instance/src/cpp/gateway/gateway.cpp
@@ -91,7 +91,7 @@
   eventLoop_->loop();
 }
 
-void Gateway::HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan) {
+void Gateway::HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan) {
   LOG(INFO) << "Received a new physical plan from Stmgr";
   if (config::TopologyConfigHelper::IsComponentSpout(pplan->topology(),
                                                      instanceProto_.info().component_name())) {
@@ -109,7 +109,7 @@
   dataToSlave_->enqueue(std::move(pplan));
 }
 
-void Gateway::HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> msg) {
+void Gateway::HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> msg) {
   dataToSlave_->enqueue(std::move(msg));
   if (dataToSlave_->size() > maxReadBufferSize_) {
     stmgrClient_->putBackPressure();
diff --git a/heron/instance/src/cpp/gateway/gateway.h b/heron/instance/src/cpp/gateway/gateway.h
index b0c7c08..cfd4120 100644
--- a/heron/instance/src/cpp/gateway/gateway.h
+++ b/heron/instance/src/cpp/gateway/gateway.h
@@ -56,17 +56,18 @@
   void HandleSlaveMetrics(google::protobuf::Message* msg);
 
   std::shared_ptr<EventLoop> eventLoop() { return eventLoop_; }
-  void setCommunicators(NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
-                        NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
-                        NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
+  void setCommunicators(
+          NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
+          NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
+          NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
     dataToSlave_ = dataToSlave;
     dataFromSlave_ = dataFromSlave;
     metricsFromSlave_ = metricsFromSlave;
   }
 
  private:
-  void HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan);
-  void HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> tuples);
+  void HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan);
+  void HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tuples);
   void ResumeConsumingFromSlaveTimer();
   std::string topologyName_;
   std::string topologyId_;
@@ -76,7 +77,7 @@
   std::shared_ptr<StMgrClient> stmgrClient_;
   std::shared_ptr<common::MetricsMgrClient> metricsMgrClient_;
   std::shared_ptr<GatewayMetrics> gatewayMetrics_;
-  NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave_;
+  NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave_;
   NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
   NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave_;
   std::shared_ptr<EventLoop> eventLoop_;
diff --git a/heron/instance/src/cpp/gateway/stmgr-client.cpp b/heron/instance/src/cpp/gateway/stmgr-client.cpp
index 08d0259..5fe7adc 100644
--- a/heron/instance/src/cpp/gateway/stmgr-client.cpp
+++ b/heron/instance/src/cpp/gateway/stmgr-client.cpp
@@ -37,8 +37,8 @@
                    const std::string& topologyName, const std::string& topologyId,
                    const proto::system::Instance& instanceProto,
                    std::shared_ptr<GatewayMetrics> gatewayMetrics,
-                   std::function<void(std::unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher,
-                   std::function<void(std::unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher)
+                   std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher,
+                   std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher)
     : Client(eventLoop, options),
       topologyName_(topologyName),
       topologyId_(topologyId),
@@ -100,7 +100,7 @@
 
 void StMgrClient::HandleRegisterResponse(
     void*,
-    unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
+    pool_unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
     NetworkErrorCode status) {
   if (status != OK) {
     LOG(ERROR) << "NonOK network code " << status << " for register response from stmgr "
@@ -125,7 +125,8 @@
 
   if (response->has_pplan()) {
     LOG(INFO) << "Registration response had a pplan";
-    pplanWatcher_(std::move(unique_ptr<proto::system::PhysicalPlan>(response->release_pplan())));
+    using std::move;
+    pplanWatcher_(move(pool_unique_ptr<proto::system::PhysicalPlan>(response->release_pplan())));
   }
 }
 
@@ -140,13 +141,14 @@
   return;
 }
 
-void StMgrClient::HandlePhysicalPlan(unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg) {
+void StMgrClient::HandlePhysicalPlan(
+        pool_unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg) {
   LOG(INFO) << "Got a Physical Plan from our stmgr " << instanceProto_.stmgr_id() << " running at "
             << get_clientoptions().get_host() << ":" << get_clientoptions().get_port();
-  pplanWatcher_(std::move(unique_ptr<proto::system::PhysicalPlan>(msg->release_pplan())));
+  pplanWatcher_(std::move(pool_unique_ptr<proto::system::PhysicalPlan>(msg->release_pplan())));
 }
 
-void StMgrClient::HandleTupleMessage(unique_ptr<proto::system::HeronTupleSet2> msg) {
+void StMgrClient::HandleTupleMessage(pool_unique_ptr<proto::system::HeronTupleSet2> msg) {
   gatewayMetrics_->updateReceivedPacketsCount(1);
   gatewayMetrics_->updateReceivedPacketsSize(msg->ByteSize());
   tupleWatcher_(std::move(msg));
diff --git a/heron/instance/src/cpp/gateway/stmgr-client.h b/heron/instance/src/cpp/gateway/stmgr-client.h
index 85c5a65..fac1169 100644
--- a/heron/instance/src/cpp/gateway/stmgr-client.h
+++ b/heron/instance/src/cpp/gateway/stmgr-client.h
@@ -38,8 +38,8 @@
               const std::string& topologyName,
               const std::string& topologyId, const proto::system::Instance& instance,
               std::shared_ptr<GatewayMetrics> gatewayMetrics,
-              std::function<void(std::unique_ptr<proto::system::PhysicalPlan>)> pplan_watcher,
-              std::function<void(std::unique_ptr<proto::system::HeronTupleSet2>)> tuple_watcher);
+              std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplan_watcher,
+              std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tuple_watcher);
   virtual ~StMgrClient();
 
   void SendTupleMessage(const proto::system::HeronTupleSet& msg);
@@ -51,10 +51,11 @@
   virtual void HandleClose(NetworkErrorCode status);
 
  private:
-  void HandleRegisterResponse(void*, unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
+  void HandleRegisterResponse(void*,
+                              pool_unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
                               NetworkErrorCode status);
-  void HandlePhysicalPlan(unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg);
-  void HandleTupleMessage(unique_ptr<proto::system::HeronTupleSet2> tupleMessage);
+  void HandlePhysicalPlan(pool_unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg);
+  void HandleTupleMessage(pool_unique_ptr<proto::system::HeronTupleSet2> tupleMessage);
 
   void OnReconnectTimer();
   void SendRegisterRequest();
@@ -63,8 +64,8 @@
   std::string topologyId_;
   const proto::system::Instance& instanceProto_;
   std::shared_ptr<GatewayMetrics> gatewayMetrics_;
-  std::function<void(std::unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher_;
-  std::function<void(std::unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher_;
+  std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher_;
+  std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher_;
   int64_t ndropped_messages_;
   int reconnect_interval_;
   int max_reconnect_times_;
diff --git a/heron/instance/src/cpp/instance-main.cpp b/heron/instance/src/cpp/instance-main.cpp
index ac73723..d81897f 100644
--- a/heron/instance/src/cpp/instance-main.cpp
+++ b/heron/instance/src/cpp/instance-main.cpp
@@ -64,7 +64,7 @@
   auto slave = new heron::instance::Slave(FLAGS_task_id, FLAGS_topology_binary);
 
   auto dataToSlave =
-          new heron::instance::NotifyingCommunicator<unique_ptr<google::protobuf::Message>>(
+          new heron::instance::NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>(
                                slave->eventLoop(),
                                std::bind(&heron::instance::Slave::HandleGatewayData,
                                          slave, std::placeholders::_1),
diff --git a/heron/instance/src/cpp/slave/instance-base.h b/heron/instance/src/cpp/slave/instance-base.h
index 45dc29c..a10398f 100644
--- a/heron/instance/src/cpp/slave/instance-base.h
+++ b/heron/instance/src/cpp/slave/instance-base.h
@@ -36,7 +36,7 @@
   virtual void Deactivate() = 0;
   virtual bool IsRunning() = 0;
   virtual void DoWork() = 0;
-  virtual void HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) = 0;
+  virtual void HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) = 0;
 };
 
 }  // namespace instance
diff --git a/heron/instance/src/cpp/slave/slave.cpp b/heron/instance/src/cpp/slave/slave.cpp
index 46312a4..f50e201 100644
--- a/heron/instance/src/cpp/slave/slave.cpp
+++ b/heron/instance/src/cpp/slave/slave.cpp
@@ -58,9 +58,9 @@
 }
 
 void Slave::setCommunicators(
-                        NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
-                        NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
-                        NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
+                    NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
+                    NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
+                    NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
   dataToSlave_ = dataToSlave;
   dataFromSlave_ = dataFromSlave;
   metricsFromSlave_ = metricsFromSlave;
@@ -80,20 +80,20 @@
   eventLoop_->loop();
 }
 
-void Slave::HandleGatewayData(unique_ptr<google::protobuf::Message> msg) {
+void Slave::HandleGatewayData(pool_unique_ptr<google::protobuf::Message> msg) {
   if (msg->GetTypeName() == pplan_typename_) {
     LOG(INFO) << "Slave Received a new pplan message from Gateway";
-    auto pplan = unique_ptr<proto::system::PhysicalPlan>(
+    auto pplan = pool_unique_ptr<proto::system::PhysicalPlan>(
             static_cast<proto::system::PhysicalPlan*>(msg.release()));
     HandleNewPhysicalPlan(std::move(pplan));
   } else {
-    auto tupleSet = unique_ptr<proto::system::HeronTupleSet2>(
+    auto tupleSet = pool_unique_ptr<proto::system::HeronTupleSet2>(
             static_cast<proto::system::HeronTupleSet2*>(msg.release()));
     HandleStMgrTuples(std::move(tupleSet));
   }
 }
 
-void Slave::HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan) {
+void Slave::HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan) {
   std::shared_ptr<proto::system::PhysicalPlan> newPplan = std::move(pplan);
   taskContext_->newPhysicalPlan(newPplan);
   if (!instance_) {
@@ -125,7 +125,7 @@
   }
 }
 
-void Slave::HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
+void Slave::HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
   if (instance_) {
     instance_->HandleGatewayTuples(std::move(tupleSet));
   } else {
diff --git a/heron/instance/src/cpp/slave/slave.h b/heron/instance/src/cpp/slave/slave.h
index 377e0c9..3e5f946 100644
--- a/heron/instance/src/cpp/slave/slave.h
+++ b/heron/instance/src/cpp/slave/slave.h
@@ -42,12 +42,13 @@
   void Start();
 
   std::shared_ptr<EventLoop> eventLoop() { return eventLoop_; }
-  void setCommunicators(NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
-                        NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
-                        NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave);
+  void setCommunicators(
+                    NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
+                    NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
+                    NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave);
 
   // Handles data from gateway thread
-  void HandleGatewayData(unique_ptr<google::protobuf::Message> msg);
+  void HandleGatewayData(pool_unique_ptr<google::protobuf::Message> msg);
 
   // This is the notification that gateway thread consumed something that we wrote
   void HandleGatewayDataConsumed();
@@ -59,13 +60,13 @@
   // This is the one thats running in the slave thread
   void InternalStart();
   // Called when a new phyiscal plan is received
-  void HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan);
+  void HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan);
   // Called when we receive new tuple messages from gateway
-  void HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> msg);
+  void HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> msg);
 
   int myTaskId_;
   std::shared_ptr<TaskContextImpl> taskContext_;
-  NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave_;
+  NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave_;
   NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
   NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave_;
   InstanceBase* instance_;
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
index 7ad7167..0374794 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
@@ -178,7 +178,7 @@
           collector_->numInFlight() < maxSpoutPending));
 }
 
-void SpoutInstance::HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
+void SpoutInstance::HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
   if (tupleSet->has_data()) {
     LOG(FATAL) << "Spout cannot get incoming data tuples from other components";
   }
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.h b/heron/instance/src/cpp/spoutimpl/spout-instance.h
index 36d328f..a93ec14 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.h
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.h
@@ -51,7 +51,7 @@
   virtual void Deactivate();
   virtual bool IsRunning() { return active_; }
   virtual void DoWork();
-  virtual void HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet);
+  virtual void HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet);
 
  private:
   void lookForTimeouts();
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
index 821a671..106abc3 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
@@ -42,7 +42,7 @@
          shared_ptr<common::MetricsMgrSt> const& _metrics_manager_client,
          std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _tupleset_drainer,
          std::function<void(proto::stmgr::TupleStreamMessage*)> _tuplestream_drainer,
-         std::function<void(sp_int32, std::unique_ptr<InitiateStatefulCheckpoint>)> _ckpt_drainer)
+         std::function<void(sp_int32, pool_unique_ptr<InitiateStatefulCheckpoint>)> _ckpt_drainer)
   : drain_threshold_(_drain_threshold), current_size_(0),
     neighbour_calculator_(_neighbour_calculator),
     metrics_manager_client_(_metrics_manager_client), tupleset_drainer_(_tupleset_drainer),
@@ -72,7 +72,7 @@
   size_metric_->SetValue(current_size_);
 }
 
-void CheckpointGateway::SendToInstance(unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+void CheckpointGateway::SendToInstance(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
   if (current_size_ > drain_threshold_) {
     ForceDrain();
   }
@@ -176,8 +176,8 @@
       return _tuple;
     } else {
       auto tp = std::make_tuple(_tuple,
-                                (proto::stmgr::TupleStreamMessage*)nullptr,
-                                (unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
+                              (proto::stmgr::TupleStreamMessage*)nullptr,
+                              (pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
       add(tp, _size);
       return nullptr;
     }
@@ -196,8 +196,8 @@
       return _tuple;
     } else {
       auto tp = std::make_tuple((proto::system::HeronTupleSet2*)nullptr,
-                                _tuple,
-                                (unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
+                              _tuple,
+                              (pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
       add(tp, _size);
       return nullptr;
     }
@@ -235,7 +235,7 @@
               << " All checkpoint markers received for checkpoint "
                  << _checkpoint_id;
     // We need to add Initiate Checkpoint message before the current set
-    auto message = make_unique<proto::ckptmgr::InitiateStatefulCheckpoint>();
+    auto message = make_unique_from_protobuf_pool<proto::ckptmgr::InitiateStatefulCheckpoint>();
     message->set_checkpoint_id(_checkpoint_id);
     int cache_size = message->GetCachedSize();
     auto new_tuple = std::make_tuple(
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
index 0d17d15..2129832 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
@@ -57,10 +57,10 @@
         shared_ptr<common::MetricsMgrSt> const& _metrics_manager_client,
         std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tupleset_drainer,
         std::function<void(proto::stmgr::TupleStreamMessage*)> tuplestream_drainer,
-        std::function<void(sp_int32, unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer);
+        std::function<void(sp_int32, pool_unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer);
   virtual ~CheckpointGateway();
   void SendToInstance(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
-  void SendToInstance(unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+  void SendToInstance(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
   void HandleUpstreamMarker(sp_int32 _src_task_id, sp_int32 _destination_task_id,
                             const sp_string& _checkpoint_id);
 
@@ -70,7 +70,7 @@
  private:
   typedef std::tuple<proto::system::HeronTupleSet2*,
                      proto::stmgr::TupleStreamMessage*,
-                     unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>>
+                     pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>>
           Tuple;
 
   // This helper class defines the current state of affairs
@@ -111,7 +111,7 @@
   std::unordered_map<sp_int32, unique_ptr<CheckpointInfo>> pending_tuples_;
   std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tupleset_drainer_;
   std::function<void(proto::stmgr::TupleStreamMessage*)> tuplestream_drainer_;
-  std::function<void(sp_int32, unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer_;
+  std::function<void(sp_int32, pool_unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer_;
 };
 
 }  // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index a22905e..332bc88 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -116,9 +116,9 @@
 }
 
 void CkptMgrClient::HandleRegisterStMgrResponse(
-                                        void*,
-                                        unique_ptr<proto::ckptmgr::RegisterStMgrResponse> _response,
-                                        NetworkErrorCode _status) {
+                                  void*,
+                                  pool_unique_ptr<proto::ckptmgr::RegisterStMgrResponse> _response,
+                                  NetworkErrorCode _status) {
   if (_status != OK) {
     LOG(ERROR) << "NonOK network code " << _status << " for register response from ckptmgr "
                << ckptmgr_id_ << "running at " << get_clientoptions().get_host() << ":"
@@ -182,7 +182,7 @@
 
 void CkptMgrClient::HandleSaveInstanceStateResponse(
                              void*,
-                             unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
+                             pool_unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
                              NetworkErrorCode _status) {
   if (_status != OK) {
     LOG(ERROR) << "NonOK response message for SaveInstanceStateResponse";
@@ -200,7 +200,7 @@
 
 void CkptMgrClient::HandleGetInstanceStateResponse(
                              void* _ctx,
-                             unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
+                             pool_unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
                              NetworkErrorCode _status) {
   int32_t* nattempts = static_cast<int32_t*>(_ctx);
   if (_status != OK) {
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index ad3871a..6046221 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -55,10 +55,10 @@
   void GetInstanceState(const proto::system::Instance& _instance,
                         const std::string& _checkpoint_id, int32_t* _nattempts);
   virtual void HandleSaveInstanceStateResponse(void*,
-                             unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
+                             pool_unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
                              NetworkErrorCode status);
   virtual void HandleGetInstanceStateResponse(void*,
-                             unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
+                             pool_unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
                              NetworkErrorCode status);
   virtual void HandleConnect(NetworkErrorCode status);
   virtual void HandleClose(NetworkErrorCode status);
@@ -66,7 +66,7 @@
  private:
   void HandleRegisterStMgrResponse(
                                    void *,
-                                   unique_ptr<proto::ckptmgr::RegisterStMgrResponse >_response,
+                                   pool_unique_ptr<proto::ckptmgr::RegisterStMgrResponse >_response,
                                    NetworkErrorCode);
   void SendRegisterRequest();
 
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index b2ef29b..6c0f973 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -287,7 +287,7 @@
 }
 
 void InstanceServer::HandleRegisterInstanceRequest(REQID _reqid, Connection* _conn,
-                                       unique_ptr<proto::stmgr::RegisterInstanceRequest> _request) {
+                                  pool_unique_ptr<proto::stmgr::RegisterInstanceRequest> _request) {
   LOG(INFO) << "Got HandleRegisterInstanceRequest from connection " << _conn << " and instance "
             << _request->instance().instance_id();
   // Do some basic checks
@@ -377,7 +377,7 @@
 }
 
 void InstanceServer::HandleTupleSetMessage(Connection* _conn,
-                                           unique_ptr<proto::system::HeronTupleSet> _message) {
+                                           pool_unique_ptr<proto::system::HeronTupleSet> _message) {
   auto iter = active_instances_.find(_conn);
   if (iter == active_instances_.end()) {
     LOG(ERROR) << "Received TupleSet from unknown instance connection. Dropping..";
@@ -398,7 +398,7 @@
                              std::move(_message));
 }
 
-void InstanceServer::SendToInstance2(unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+void InstanceServer::SendToInstance2(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
   sp_string instance_id = task_id_to_name[_message->task_id()];
   ConnectionBufferLengthMetricMap::const_iterator it =
     connection_buffer_length_metric_map_.find(instance_id);
@@ -469,7 +469,7 @@
 }
 
 void InstanceServer::DrainCheckpoint(sp_int32 _task_id,
-                                  unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message) {
+                             pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message) {
   TaskIdInstanceDataMap::iterator iter = instance_info_.find(_task_id);
   if (iter == instance_info_.end() || iter->second->conn_ == NULL) {
     LOG(ERROR) << "task_id " << _task_id << " has not yet connected to us. Dropping...";
@@ -647,7 +647,7 @@
 }
 
 void InstanceServer::HandleStoreInstanceStateCheckpointMessage(Connection* _conn,
-                               unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message) {
+                          pool_unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message) {
   ConnectionTaskIdMap::iterator iter = active_instances_.find(_conn);
   if (iter == active_instances_.end()) {
     LOG(ERROR) << "Hmm.. Got InstaceStateCheckpoint Message from an unknown connection";
@@ -667,7 +667,7 @@
 }
 
 void InstanceServer::HandleRestoreInstanceStateResponse(Connection* _conn,
-                               unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message) {
+                          pool_unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message) {
   ConnectionTaskIdMap::iterator iter = active_instances_.find(_conn);
   if (iter == active_instances_.end()) {
     LOG(ERROR) << "Hmm.. Got RestoreInstanceStateResponse Message from an unknown connection";
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index e7037fb..df4b5e9 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -63,7 +63,7 @@
 
   // We own the message
   void SendToInstance2(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
-  void SendToInstance2(unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+  void SendToInstance2(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
 
   // When we get a checkpoint marker from _src_task_id destined for _destination_task_id
   // this function in invoked, so that we might register it in gateway
@@ -107,7 +107,7 @@
   void DrainTupleSet(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
   void DrainTupleStream(proto::stmgr::TupleStreamMessage* _message);
   void DrainCheckpoint(sp_int32 _task_id,
-          unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message);
+                       pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message);
   sp_string MakeBackPressureCompIdMetricName(const sp_string& instanceid);
   sp_string MakeQueueSizeCompIdMetricName(const sp_string& instanceid);
   sp_string MakeQueueLengthCompIdMetricName(const sp_string& instanceid);
@@ -118,12 +118,13 @@
 
   // Next from local instances
   void HandleRegisterInstanceRequest(REQID _id, Connection* _conn,
-                                     unique_ptr<proto::stmgr::RegisterInstanceRequest> _request);
-  void HandleTupleSetMessage(Connection* _conn, unique_ptr<proto::system::HeronTupleSet> _message);
+                                   pool_unique_ptr<proto::stmgr::RegisterInstanceRequest> _request);
+  void HandleTupleSetMessage(Connection* _conn,
+                             pool_unique_ptr<proto::system::HeronTupleSet> _message);
   void HandleStoreInstanceStateCheckpointMessage(Connection* _conn,
-                                 unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message);
+                           pool_unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message);
   void HandleRestoreInstanceStateResponse(Connection* _conn,
-                                 unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message);
+                            pool_unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message);
 
   // Back pressure related connection callbacks
   // Do back pressure
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 9186dc3..0772bb9 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -143,9 +143,9 @@
 }
 
 void StMgrClient::HandleHelloResponse(
-                                        void*,
-                                        unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
-                                        NetworkErrorCode _status) {
+                                       void*,
+                                       pool_unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
+                                       NetworkErrorCode _status) {
   if (_status != OK) {
     LOG(ERROR) << "NonOK network code " << _status << " for register response from stmgr "
                << other_stmgr_id_ << " running at " << get_clientoptions().get_host() << ":"
@@ -239,7 +239,8 @@
   return retval;
 }
 
-void StMgrClient::HandleTupleStreamMessage(unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+void StMgrClient::HandleTupleStreamMessage(
+        pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
   LOG(FATAL) << "We should not receive tuple messages in the client" << std::endl;
 }
 
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.h b/heron/stmgr/src/cpp/manager/stmgr-client.h
index acaa64b..d3cba96 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.h
@@ -65,9 +65,9 @@
  private:
   void HandleHelloResponse(
           void*,
-          unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
+          pool_unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
           NetworkErrorCode);
-  void HandleTupleStreamMessage(unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+  void HandleTupleStreamMessage(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
 
   void OnReConnectTimer();
   void SendHelloRequest();
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 4b52c89..12e573e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -125,7 +125,7 @@
 }
 
 void StMgrServer::HandleStMgrHelloRequest(REQID _id, Connection* _conn,
-                                          unique_ptr<proto::stmgr::StrMgrHelloRequest> _request) {
+                                       pool_unique_ptr<proto::stmgr::StrMgrHelloRequest> _request) {
   LOG(INFO) << "Got a hello message from stmgr " << _request->stmgr() << " on connection " << _conn;
   proto::stmgr::StrMgrHelloResponse response;
   // Some basic checks
@@ -155,7 +155,7 @@
 }
 
 void StMgrServer::HandleTupleStreamMessage(Connection* _conn,
-                                           unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+                                       pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
   auto iter = rstmgrs_.find(_conn);
   if (iter == rstmgrs_.end()) {
     LOG(INFO) << "Recieved Tuple messages from unknown streammanager connection";
@@ -204,7 +204,7 @@
 }
 
 void StMgrServer::HandleStartBackPressureMessage(Connection* _conn,
-                                      unique_ptr<proto::stmgr::StartBackPressureMessage> _message) {
+                               pool_unique_ptr<proto::stmgr::StartBackPressureMessage> _message) {
   // Close spouts
   LOG(INFO) << "Received start back pressure from str mgr " << _message->stmgr();
   if (_message->topology_name() != topology_name_ || _message->topology_id() != topology_id_) {
@@ -222,7 +222,7 @@
 }
 
 void StMgrServer::HandleStopBackPressureMessage(Connection* _conn,
-                                       unique_ptr<proto::stmgr::StopBackPressureMessage> _message) {
+                                pool_unique_ptr<proto::stmgr::StopBackPressureMessage> _message) {
   LOG(INFO) << "Received stop back pressure from str mgr " << _message->stmgr();
   if (_message->topology_name() != topology_name_ || _message->topology_id() != topology_id_) {
     LOG(ERROR) << "Received stop back pressure message from unknown stream manager "
@@ -246,7 +246,7 @@
 }
 
 void StMgrServer::HandleDownstreamStatefulCheckpointMessage(Connection* _conn,
-                               unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message) {
+                          pool_unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message) {
   stmgr_->HandleDownStreamStatefulCheckpoint(*_message);
 }
 
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 13b934e..325f0c1 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -70,19 +70,19 @@
 
   // First from other stream managers
   void HandleStMgrHelloRequest(REQID _id, Connection* _conn,
-                               unique_ptr<proto::stmgr::StrMgrHelloRequest> _request);
+                               pool_unique_ptr<proto::stmgr::StrMgrHelloRequest> _request);
   void HandleTupleStreamMessage(Connection* _conn,
-          unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+                                pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
 
   // Handler for DownstreamStatefulCheckpoint from a peer stmgr
   void HandleDownstreamStatefulCheckpointMessage(Connection* _conn,
-                                unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message);
+                           pool_unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message);
 
   // Backpressure message from and to other stream managers
   void HandleStartBackPressureMessage(Connection* _conn,
-                                      unique_ptr<proto::stmgr::StartBackPressureMessage> _message);
+                                 pool_unique_ptr<proto::stmgr::StartBackPressureMessage> _message);
   void HandleStopBackPressureMessage(Connection* _conn,
-                                     unique_ptr<proto::stmgr::StopBackPressureMessage> _message);
+                                 pool_unique_ptr<proto::stmgr::StopBackPressureMessage> _message);
 
   // map from stmgr_id to their connection
   typedef std::unordered_map<sp_string, Connection*> StreamManagerConnectionMap;
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 056c4ec..6510271 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -675,7 +675,7 @@
 const shared_ptr<proto::system::PhysicalPlan> StMgr::GetPhysicalPlan() const { return pplan_; }
 
 void StMgr::HandleStreamManagerData(const sp_string&,
-                                    unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+                                    pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
   if (stateful_restorer_ && stateful_restorer_->InProgress()) {
     LOG(INFO) << "Dropping data received from stmgr because we are in Restore";
     dropped_during_restore_metrics_->scope(RESTORE_DROPPED_STMGR_BYTES)
@@ -769,7 +769,7 @@
 
 // Called when local tasks generate data
 void StMgr::HandleInstanceData(const sp_int32 _src_task_id, bool _local_spout,
-                               unique_ptr<proto::system::HeronTupleSet> _message) {
+                               pool_unique_ptr<proto::system::HeronTupleSet> _message) {
   instance_bytes_received_metrics_->scope(std::to_string(_src_task_id))
       ->incr_by(_message->ByteSize());
 
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index c158792..decdc39 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -79,9 +79,9 @@
   // Called by tmaster client when a new physical plan is available
   void NewPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> pplan);
   void HandleStreamManagerData(const sp_string& _stmgr_id,
-                               unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+                               pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
   void HandleInstanceData(sp_int32 _task_id, bool _local_spout,
-                          unique_ptr<proto::system::HeronTupleSet> _message);
+                          pool_unique_ptr<proto::system::HeronTupleSet> _message);
   // Called when an instance does checkpoint and sends its checkpoint
   // to the stmgr to save it
   void HandleStoreInstanceStateCheckpoint(
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index bec20d7..f1e27d8 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -146,9 +146,9 @@
 }
 
 void TMasterClient::HandleRegisterResponse(
-                                      void*,
-                                      unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
-                                      NetworkErrorCode _status) {
+                                  void*,
+                                  pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
+                                  NetworkErrorCode _status) {
   if (_status != OK) {
     LOG(ERROR) << "non ok network stack code for Register Response from Tmaster" << std::endl;
     Stop();
@@ -173,9 +173,9 @@
 }
 
 void TMasterClient::HandleHeartbeatResponse(
-                                      void*,
-                                      unique_ptr<proto::tmaster::StMgrHeartbeatResponse> _response,
-                                      NetworkErrorCode _status) {
+                                  void*,
+                                  pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> _response,
+                                  NetworkErrorCode _status) {
   if (_status != OK) {
     LOG(ERROR) << "NonOK response message for heartbeat Response" << std::endl;
     Stop();
@@ -196,13 +196,13 @@
 }
 
 void TMasterClient::HandleNewAssignmentMessage(
-                                      unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message) {
+        pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message) {
   LOG(INFO) << "Got a new assignment" << std::endl;
   pplan_watch_(shared_ptr<proto::system::PhysicalPlan>(_message->release_new_pplan()));
 }
 
 void TMasterClient::HandleStatefulCheckpointMessage(
-                                    unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message) {
+        pool_unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message) {
   LOG(INFO) << "Got a new start stateful checkpoint message from tmaster with id "
             << _message->checkpoint_id();
   stateful_checkpoint_watch_(_message->checkpoint_id());
@@ -294,12 +294,12 @@
 }
 
 void TMasterClient::HandleRestoreTopologyStateRequest(
-                                unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message) {
+        pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message) {
   restore_topology_watch_(_message->checkpoint_id(), _message->restore_txid());
 }
 
 void TMasterClient::HandleStartStmgrStatefulProcessing(
-                               unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _message) {
+        pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _message) {
   start_stateful_watch_(_message->checkpoint_id());
 }
 
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.h b/heron/stmgr/src/cpp/manager/tmaster-client.h
index 3d25395..ec83406 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.h
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.h
@@ -73,18 +73,20 @@
   virtual void HandleClose(NetworkErrorCode status);
 
  private:
-  void HandleRegisterResponse(void*, unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
+  void HandleRegisterResponse(void*,
+                              pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
                               NetworkErrorCode);
-  void HandleHeartbeatResponse(void*, unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+  void HandleHeartbeatResponse(void*,
+                               pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
                                NetworkErrorCode);
 
-  void HandleNewAssignmentMessage(unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message);
+  void HandleNewAssignmentMessage(pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message);
   void HandleStatefulCheckpointMessage(
-                                      unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message);
+          pool_unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message);
   void HandleRestoreTopologyStateRequest(
-                                  unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message);
+          pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message);
   void HandleStartStmgrStatefulProcessing(
-                                     unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
+          pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
 
   void OnReConnectTimer();
   void OnHeartbeatTimer();
diff --git a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
index 9f058fb..51d8efd 100644
--- a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
@@ -212,7 +212,7 @@
 }
 
 void drainer3(sp_int32 _task_id,
-        std::unique_ptr<heron::proto::ckptmgr::InitiateStatefulCheckpoint> _ckpt) {
+        pool_unique_ptr<heron::proto::ckptmgr::InitiateStatefulCheckpoint> _ckpt) {
   drainer3_markers.push_back(_task_id);
 }
 
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.cpp b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
index 75bf608..5a61900 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
@@ -76,9 +76,9 @@
 }
 
 void DummyInstance::HandleInstanceResponse(
-                                void*,
-                                unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
-                                NetworkErrorCode status) {
+                            void*,
+                            pool_unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
+                            NetworkErrorCode status) {
   CHECK_EQ(status, OK);
   if (_message->has_pplan()) {
     if (recvd_stmgr_pplan_) {
@@ -92,10 +92,10 @@
   register_response_status = _message->status().status();
 }
 
-void DummyInstance::HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2>) {}
+void DummyInstance::HandleTupleMessage(pool_unique_ptr<heron::proto::system::HeronTupleSet2>) {}
 
 void DummyInstance::HandleNewInstanceAssignmentMsg(
-    std::unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>) {}
+        pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>) {}
 
 void DummyInstance::CreateAndSendInstanceRequest() {
   auto request = make_unique<heron::proto::stmgr::RegisterInstanceRequest>();
@@ -130,8 +130,7 @@
       under_backpressure_(false) {}
 
 void DummySpoutInstance::HandleNewInstanceAssignmentMsg(
-    unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
-
+        pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
   const heron::proto::system::PhysicalPlan pplan = _msg->pplan();
 
   DummyInstance::HandleNewInstanceAssignmentMsg(std::move(_msg));
@@ -188,13 +187,14 @@
       expected_msgs_to_recv_(_expected_msgs_to_recv),
       msgs_recvd_(0) {}
 
-void DummyBoltInstance::HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2> msg) {
+void DummyBoltInstance::HandleTupleMessage(
+        pool_unique_ptr<heron::proto::system::HeronTupleSet2> msg) {
   if (msg->has_data()) msgs_recvd_ += msg->mutable_data()->tuples_size();
   if (msgs_recvd_ >= expected_msgs_to_recv_) getEventLoop()->loopExit();
 }
 
 void DummyBoltInstance::HandleNewInstanceAssignmentMsg(
-    unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
+        pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
   DummyInstance::HandleNewInstanceAssignmentMsg(std::move(_msg));
   if (expected_msgs_to_recv_ == 0) {
     getEventLoop()->loopExit();
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.h b/heron/stmgr/tests/cpp/server/dummy_instance.h
index e603da6..f06a1b4 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.h
@@ -40,14 +40,14 @@
 
   // Handle incoming message
   virtual void HandleInstanceResponse(
-                                void* ctx,
-                                unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
-                                NetworkErrorCode status);
+                            void* ctx,
+                            pool_unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
+                            NetworkErrorCode status);
   // Handle incoming tuples
-  virtual void HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2> _message);
+  virtual void HandleTupleMessage(pool_unique_ptr<heron::proto::system::HeronTupleSet2> _message);
   // Handle the instance assignment message
   virtual void HandleNewInstanceAssignmentMsg(
-                               std::unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>);
+                              pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>);
 
   sp_string topology_name_;
   sp_string topology_id_;
@@ -82,7 +82,7 @@
  protected:
   // Handle incoming message
   virtual void HandleNewInstanceAssignmentMsg(
-      unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
+          pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
   void CreateAndSendTupleMessages();
   virtual void StartBackPressureConnectionCb(Connection* connection) {
     under_backpressure_ = true;
@@ -115,9 +115,9 @@
  protected:
   // Handle incoming message
   // Handle incoming tuples
-  virtual void HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2> _message);
+  virtual void HandleTupleMessage(pool_unique_ptr<heron::proto::system::HeronTupleSet2> _message);
   virtual void HandleNewInstanceAssignmentMsg(
-      unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
+          pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
 
  private:
   sp_int32 expected_msgs_to_recv_;
diff --git a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp
index 29ccc75..4bd2b50 100644
--- a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp
@@ -57,7 +57,7 @@
 }
 
 void DummyMtrMgr::HandleMetricPublisherRegisterRequest(REQID id, Connection* conn,
-        unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> request) {
+                   pool_unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> request) {
   LOG(INFO) << "Got a register request ";
   heron::proto::system::MetricPublisherRegisterResponse response;
   response.mutable_status()->set_status(heron::proto::system::OK);
@@ -65,10 +65,10 @@
 }
 
 void DummyMtrMgr::HandleMetricPublisherPublishMessage(
-    Connection*, unique_ptr<heron::proto::system::MetricPublisherPublishMessage> message) {}
+    Connection*, pool_unique_ptr<heron::proto::system::MetricPublisherPublishMessage> message) {}
 
 void DummyMtrMgr::HandleTMasterLocationMessage(
-    Connection*, unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> message) {
+    Connection*, pool_unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> message) {
   location_ = message->release_tmaster();
 
   LOG(INFO) << "Got tmaster location: " << location_->host() << ":" << location_->master_port();
diff --git a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h
index 3b773f6..17710ef 100644
--- a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h
+++ b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h
@@ -49,11 +49,12 @@
 
   // Handle metrics publisher request
   virtual void HandleMetricPublisherRegisterRequest(REQID _id, Connection* _conn,
-          unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> _request);
+                  pool_unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> _request);
   virtual void HandleMetricPublisherPublishMessage(
-      Connection* _conn, unique_ptr<heron::proto::system::MetricPublisherPublishMessage> _message);
+      Connection* _conn,
+      pool_unique_ptr<heron::proto::system::MetricPublisherPublishMessage> _message);
   virtual void HandleTMasterLocationMessage(
-      Connection*, unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> _message);
+      Connection*, pool_unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> _message);
 
  private:
   sp_string stmgr_id_expected_;
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
index d4072a4..5f94fa4 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
@@ -48,7 +48,9 @@
 DummyTMasterClient::~DummyTMasterClient() {}
 
 void DummyTMasterClient::HandleRegisterResponse(
-    void*, unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response, NetworkErrorCode) {
+    void*,
+    pool_unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
+    NetworkErrorCode) {
 }
 
 void DummyTMasterClient::HandleConnect(NetworkErrorCode _status) {
@@ -119,7 +121,7 @@
 void DummyStMgr::HandleConnectionClose(Connection*, NetworkErrorCode) {}
 
 void DummyStMgr::HandleStMgrHelloRequest(REQID _id, Connection* _conn,
-                                     unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request) {
+                               pool_unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request) {
   other_stmgrs_ids_.push_back(_request->stmgr());
   heron::proto::stmgr::StrMgrHelloResponse response;
   response.mutable_status()->set_status(heron::proto::system::OK);
@@ -127,11 +129,11 @@
 }
 
 void DummyStMgr::HandleStartBackPressureMessage(Connection*,
-                                        unique_ptr<heron::proto::stmgr::StartBackPressureMessage>) {
+                                  pool_unique_ptr<heron::proto::stmgr::StartBackPressureMessage>) {
   ++num_start_bp_;
 }
 
 void DummyStMgr::HandleStopBackPressureMessage(Connection*,
-                                         unique_ptr<heron::proto::stmgr::StopBackPressureMessage>) {
+                                   pool_unique_ptr<heron::proto::stmgr::StopBackPressureMessage>) {
   ++num_stop_bp_;
 }
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.h b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
index 5170139..bd0342b 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.h
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
@@ -42,9 +42,9 @@
   // Handle connection close
   virtual void HandleClose(NetworkErrorCode _status);
   virtual void HandleRegisterResponse(
-                                  void*,
-                                  unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
-                                  NetworkErrorCode);
+                            void*,
+                            pool_unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
+                            NetworkErrorCode);
   // Send worker request
   void CreateAndSendRegisterRequest();
 
@@ -80,11 +80,11 @@
 
   // Handle st mgr hello message
   virtual void HandleStMgrHelloRequest(REQID _id, Connection* _conn,
-                                      unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request);
+                                 pool_unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request);
   virtual void HandleStartBackPressureMessage(Connection*,
-                                         unique_ptr<heron::proto::stmgr::StartBackPressureMessage>);
+                                    pool_unique_ptr<heron::proto::stmgr::StartBackPressureMessage>);
   virtual void HandleStopBackPressureMessage(Connection*,
-                                          unique_ptr<heron::proto::stmgr::StopBackPressureMessage>);
+                                     pool_unique_ptr<heron::proto::stmgr::StopBackPressureMessage>);
 
  private:
   std::vector<sp_string> other_stmgrs_ids_;
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
index e365cb6..eb4ac40 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
@@ -99,9 +99,9 @@
 }
 
 void CkptMgrClient::HandleTMasterRegisterResponse(
-                                    void*,
-                                    unique_ptr<proto::ckptmgr::RegisterTMasterResponse> _response,
-                                    NetworkErrorCode _status) {
+                                void*,
+                                pool_unique_ptr<proto::ckptmgr::RegisterTMasterResponse> _response,
+                                NetworkErrorCode _status) {
   if (_status != OK) {
     LOG(ERROR) << "NonOK network code" << _status << " for register response from ckptmgr "
                << "running at " << get_clientoptions().get_host() << ":"
@@ -145,9 +145,9 @@
 }
 
 void CkptMgrClient::HandleCleanStatefulCheckpointResponse(
-                              void*,
-                              unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
-                              NetworkErrorCode status) {
+                        void*,
+                        pool_unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
+                        NetworkErrorCode status) {
   LOG(INFO) << "Got CleanStatefulCheckpoint response from ckptmgr" << std::endl;
 
   proto::system::StatusCode code = proto::system::OK;
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.h b/heron/tmaster/src/cpp/manager/ckptmgr-client.h
index 7902f43..8c187aa 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.h
+++ b/heron/tmaster/src/cpp/manager/ckptmgr-client.h
@@ -42,17 +42,17 @@
 
  protected:
   virtual void HandleCleanStatefulCheckpointResponse(
-                              void*,
-                              unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
-                              NetworkErrorCode status);
+                        void*,
+                        pool_unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
+                        NetworkErrorCode status);
   virtual void HandleConnect(NetworkErrorCode status);
   virtual void HandleClose(NetworkErrorCode status);
 
  private:
   void HandleTMasterRegisterResponse(
-                                      void*,
-                                      unique_ptr<proto::ckptmgr::RegisterTMasterResponse>_response,
-                                      NetworkErrorCode _status);
+                                  void*,
+                                  pool_unique_ptr<proto::ckptmgr::RegisterTMasterResponse>_response,
+                                  NetworkErrorCode _status);
 
   void SendRegisterRequest();
 
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.cpp b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
index f35be46..82096dd 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.cpp
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
@@ -67,38 +67,38 @@
 }
 
 void TMasterServer::HandleStMgrRegisterRequest(REQID _reqid, Connection* _conn,
-                                        unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
+                                   pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
   unique_ptr<StMgrRegisterProcessor> processor =
       make_unique<StMgrRegisterProcessor>(_reqid, _conn, std::move(_request), tmaster_, this);
   processor->Start();
 }
 
 void TMasterServer::HandleStMgrHeartbeatRequest(REQID _reqid, Connection* _conn,
-                                      unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request) {
+                                  pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request) {
   unique_ptr<StMgrHeartbeatProcessor> processor =
       make_unique<StMgrHeartbeatProcessor>(_reqid, _conn, std::move(_request), tmaster_, this);
   processor->Start();
 }
 
 void TMasterServer::HandleMetricsMgrStats(Connection*,
-                                          unique_ptr<proto::tmaster::PublishMetrics> _request) {
+                                        pool_unique_ptr<proto::tmaster::PublishMetrics> _request) {
   collector_->AddMetric(*_request);
 }
 
 void TMasterServer::HandleInstanceStateStored(Connection*,
-                                         unique_ptr<proto::ckptmgr::InstanceStateStored> _message) {
+                                    pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message) {
   tmaster_->HandleInstanceStateStored(_message->checkpoint_id(), _message->instance());
 }
 
 void TMasterServer::HandleRestoreTopologyStateResponse(Connection* _conn,
-                                unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message) {
+                           pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message) {
   tmaster_->HandleRestoreTopologyStateResponse(_conn, _message->checkpoint_id(),
                                                _message->restore_txid(),
                                                _message->status().status());
 }
 
 void TMasterServer::HandleResetTopologyStateMessage(Connection* _conn,
-                                     unique_ptr<proto::ckptmgr::ResetTopologyState> _message) {
+                                    pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message) {
   tmaster_->ResetTopologyState(_conn, _message->dead_stmgr(),
                                _message->dead_taskid(), _message->reason());
 }
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.h b/heron/tmaster/src/cpp/manager/tmasterserver.h
index eb20c07..0524b61 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.h
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.h
@@ -47,22 +47,22 @@
  private:
   // Various handlers for different requests
   void HandleStMgrRegisterRequest(REQID _id, Connection* _conn,
-                                  unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
+                                  pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
   void HandleStMgrHeartbeatRequest(REQID _id, Connection* _conn,
-                                   unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request);
-  void HandleMetricsMgrStats(Connection*, unique_ptr<proto::tmaster::PublishMetrics> _request);
+                                   pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request);
+  void HandleMetricsMgrStats(Connection*, pool_unique_ptr<proto::tmaster::PublishMetrics> _request);
 
   // Message sent by stmgr to tell tmaster that a particular checkpoint message
   // was saved. This way the tmaster can keep track of which all instances have saved their
   // state for any given checkpoint id.
   void HandleInstanceStateStored(Connection*,
-                                 unique_ptr<proto::ckptmgr::InstanceStateStored> _message);
+                                 pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message);
   // Handle response from stmgr for the RestoreTopologyStateRequest
   void HandleRestoreTopologyStateResponse(Connection*,
-                                 unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message);
+                            pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message);
   // Stmgr can request tmaster to reset the state of the topology in case it finds any errors.
   void HandleResetTopologyStateMessage(Connection*,
-                                     unique_ptr<proto::ckptmgr::ResetTopologyState> _message);
+                                      pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message);
 
   // our tmaster
   shared_ptr<TMetricsCollector> collector_;
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
index 222f9e4..1c98d70 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
@@ -31,8 +31,8 @@
 namespace tmaster {
 
 StMgrHeartbeatProcessor::StMgrHeartbeatProcessor(REQID reqid, Connection* conn,
-                                         unique_ptr<proto::tmaster::StMgrHeartbeatRequest> request,
-                                         TMaster* tmaster, Server* server)
+                                     pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> request,
+                                     TMaster* tmaster, Server* server)
     : Processor(reqid, conn, std::move(request), tmaster, server) {}
 
 StMgrHeartbeatProcessor::~StMgrHeartbeatProcessor() {
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
index 6775263..6240bda 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
+++ b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
@@ -31,7 +31,7 @@
 class StMgrHeartbeatProcessor : public Processor {
  public:
   StMgrHeartbeatProcessor(REQID _reqid, Connection* _conn,
-                          unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request,
+                          pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request,
                           TMaster* _tmaster,
                           Server* _server);
   virtual ~StMgrHeartbeatProcessor();
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
index 3139703..c58dda8 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
@@ -32,8 +32,8 @@
 namespace tmaster {
 
 StMgrRegisterProcessor::StMgrRegisterProcessor(REQID _reqid, Connection* _conn,
-                                         unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
-                                         TMaster* _tmaster, Server* _server)
+                                     pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
+                                     TMaster* _tmaster, Server* _server)
     : Processor(_reqid, _conn, std::move(_request), _tmaster, _server) {}
 
 StMgrRegisterProcessor::~StMgrRegisterProcessor() {
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.h b/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
index f197377..208c09f 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
+++ b/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
@@ -31,7 +31,7 @@
 class StMgrRegisterProcessor : public Processor {
  public:
   StMgrRegisterProcessor(REQID _reqid, Connection* _conn,
-                         unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
+                         pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
                          TMaster* _tmaster,
                          Server* _server);
   virtual ~StMgrRegisterProcessor();
diff --git a/heron/tmaster/src/cpp/processor/tmaster-processor.cpp b/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
index 1df63ae..0caaa8b 100644
--- a/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
@@ -29,7 +29,7 @@
 namespace tmaster {
 
 Processor::Processor(REQID _reqid, Connection* _conn,
-                     unique_ptr<google::protobuf::Message> _request,
+                     pool_unique_ptr<google::protobuf::Message> _request,
                      TMaster* _tmaster, Server* _server)
     : request_(std::move(_request)), tmaster_(_tmaster), server_(_server),
       reqid_(_reqid), conn_(_conn) {}
diff --git a/heron/tmaster/src/cpp/processor/tmaster-processor.h b/heron/tmaster/src/cpp/processor/tmaster-processor.h
index 5ba2da9..70c7c20 100644
--- a/heron/tmaster/src/cpp/processor/tmaster-processor.h
+++ b/heron/tmaster/src/cpp/processor/tmaster-processor.h
@@ -31,7 +31,7 @@
 
 class Processor {
  public:
-  Processor(REQID _reqid, Connection* _conn, unique_ptr<google::protobuf::Message> _request,
+  Processor(REQID _reqid, Connection* _conn, pool_unique_ptr<google::protobuf::Message> _request,
             TMaster* _tmaster,
             Server* _server);
   virtual ~Processor();
@@ -41,7 +41,7 @@
   void SendResponse(const google::protobuf::Message& _response);
   Connection* GetConnection() { return conn_; }
   void CloseConnection();
-  unique_ptr<google::protobuf::Message> request_;
+  pool_unique_ptr<google::protobuf::Message> request_;
   TMaster* tmaster_;
   Server* server_;
 
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.cpp b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
index c19b33e..13cf22e 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.cpp
+++ b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
@@ -76,9 +76,9 @@
 }
 
 void DummyStMgr::HandleRegisterResponse(
-                                        void*,
-                                        unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
-                                        NetworkErrorCode status) {
+                                    void*,
+                                    pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
+                                    NetworkErrorCode status) {
   if (status != OK) {
     LOG(ERROR) << "NonOK response message for Register Response";
     Stop();
@@ -106,9 +106,9 @@
 }
 
 void DummyStMgr::HandleHeartbeatResponse(
-                                        void*,
-                                        unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
-                                        NetworkErrorCode status) {
+                                  void*,
+                                  pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+                                  NetworkErrorCode status) {
   if (status != OK) {
     LOG(ERROR) << "NonOK response message for Register Response";
     Stop();
@@ -126,7 +126,7 @@
 }
 
 void DummyStMgr::HandleNewAssignmentMessage(
-                                        unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message) {
+                                    pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message) {
   LOG(INFO) << "Got a new assignment";
   HandleNewPhysicalPlan(message->new_pplan());
 }
@@ -167,12 +167,12 @@
 }
 
 void DummyStMgr::HandleRestoreTopologyStateRequest(
-                                    unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _m) {
+                                  pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _m) {
   got_restore_message_ = true;
 }
 
 void DummyStMgr::HandleStartProcessingMessage(
-        unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _m) {
+        pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _m) {
   got_start_message_ = true;
 }
 
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.h b/heron/tmaster/tests/cpp/server/dummystmgr.h
index d02a0e9..06df950 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.h
+++ b/heron/tmaster/tests/cpp/server/dummystmgr.h
@@ -52,16 +52,18 @@
   virtual void HandleClose(NetworkErrorCode status);
 
  private:
-  void HandleRegisterResponse(void*, unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
+  void HandleRegisterResponse(void*,
+                              pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
                               NetworkErrorCode);
-  void HandleHeartbeatResponse(void*, unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+  void HandleHeartbeatResponse(void*,
+                               pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
                                NetworkErrorCode);
-  void HandleNewAssignmentMessage(unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message);
+  void HandleNewAssignmentMessage(pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message);
   void HandleNewPhysicalPlan(const proto::system::PhysicalPlan& pplan);
   void HandleRestoreTopologyStateRequest(
-                                   unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> message);
+          pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> message);
   void HandleStartProcessingMessage(
-                                  unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> message);
+          pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> message);
 
   void OnReConnectTimer();
   void OnHeartbeatTimer();
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.cpp b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
index d166d8c..a554998 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.cpp
+++ b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
@@ -48,7 +48,7 @@
 }
 
 void DummyTMaster::HandleRegisterRequest(REQID _id, Connection* _conn,
-                                        unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
+                                   pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
   std::vector<std::shared_ptr<proto::system::Instance>> instances;
   stmgrs_[_request->stmgr().id()] =
           std::make_shared<tmaster::StMgrState>(_conn, _request->stmgr(), instances, *this);
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.h b/heron/tmaster/tests/cpp/server/dummytmaster.h
index 7047a19..c00ade5 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.h
+++ b/heron/tmaster/tests/cpp/server/dummytmaster.h
@@ -47,7 +47,7 @@
 
  private:
   void HandleRegisterRequest(REQID _id, Connection* _conn,
-                             unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
+                             pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
   void HandleHeartbeatRequest(REQID _id, Connection* _conn,
                               proto::tmaster::StMgrHeartbeatRequest* _request);
   tmaster::StMgrMap stmgrs_;