When create a smart pointer we now allocate the most hot/frequent messages in the memory pool instead of the heap (#3316)
diff --git a/heron/common/src/cpp/basics/basics.h b/heron/common/src/cpp/basics/basics.h
index 766a64f..dc5be56 100644
--- a/heron/common/src/cpp/basics/basics.h
+++ b/heron/common/src/cpp/basics/basics.h
@@ -46,6 +46,9 @@
#include "basics/sptest.h"
#include "basics/mempool.h"
+template <typename T>
+using pool_unique_ptr = std::unique_ptr<T, std::function<void(google::protobuf::Message*)>>;
+
// The standard std::make_unique(...) was introduced starting from C++ 14. Heron uses C++ 11.
// Unfortunatelly we can not bump a compiler version used in Heron since we are tight coupled with
// CentOS 7 whih comes with very old version of GCC (4.8.5). That's why we introduced
@@ -55,4 +58,20 @@
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
+
+// This is a special version to get unique pointer to protobuf message.
+// It doesn't allocate anythong on heap. Instead it just acquires the existing (preallocated)
+// event from the memory pool. Resulted unique pointer also has a deleter callback which
+// returns the event back to the memory pool when that unique pointer destructed.
+template<typename T>
+pool_unique_ptr<T> make_unique_from_protobuf_pool() {
+ T* event = nullptr;
+ event = __global_protobuf_pool_acquire__(event);
+
+ return pool_unique_ptr<T>(event, [](google::protobuf::Message* ptr) {
+ T* event_to_dispose = static_cast<T*>(ptr);
+ __global_protobuf_pool_release__(event_to_dispose);
+ });
+}
+
#endif // HERON_BASICS_H_
diff --git a/heron/common/src/cpp/metrics/metricsmgr-client.cpp b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
index 297f932..bcd9687 100644
--- a/heron/common/src/cpp/metrics/metricsmgr-client.cpp
+++ b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
@@ -84,7 +84,7 @@
}
void MetricsMgrClient::HandleRegisterResponse(
- void*, unique_ptr<proto::system::MetricPublisherRegisterResponse> _response,
+ void*, pool_unique_ptr<proto::system::MetricPublisherRegisterResponse> _response,
NetworkErrorCode _status) {
if (_status == OK && _response->status().status() != proto::system::OK) {
// What the heck we explicitly got a non ok response
diff --git a/heron/common/src/cpp/metrics/metricsmgr-client.h b/heron/common/src/cpp/metrics/metricsmgr-client.h
index 4bf9844..28cd422 100644
--- a/heron/common/src/cpp/metrics/metricsmgr-client.h
+++ b/heron/common/src/cpp/metrics/metricsmgr-client.h
@@ -58,8 +58,8 @@
void ReConnect();
void SendRegisterRequest();
void HandleRegisterResponse(void* _ctx,
- unique_ptr<proto::system::MetricPublisherRegisterResponse> _respose,
- NetworkErrorCode _status);
+ pool_unique_ptr<proto::system::MetricPublisherRegisterResponse> _respose,
+ NetworkErrorCode _status);
sp_string hostname_;
sp_int32 port_;
diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h
index 976edeb..a9709b7 100644
--- a/heron/common/src/cpp/network/client.h
+++ b/heron/common/src/cpp/network/client.h
@@ -132,7 +132,7 @@
// Register a handler for a particular response type
template <typename S, typename T, typename M>
void InstallResponseHandler(unique_ptr<S> _request,
- void (T::*method)(void* _ctx, unique_ptr<M>,
+ void (T::*method)(void* _ctx, pool_unique_ptr<M>,
NetworkErrorCode status)) {
auto m = make_unique<M>();
T* t = static_cast<T*>(this);
@@ -143,7 +143,7 @@
// Register a handler for a particular message type
template <typename T, typename M>
- void InstallMessageHandler(void (T::*method)(unique_ptr<M> _message)) {
+ void InstallMessageHandler(void (T::*method)(pool_unique_ptr<M> _message)) {
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
messageHandlers[m->GetTypeName()] =
@@ -212,10 +212,10 @@
void OnPacketTimer(REQID _id, EventLoop::Status status);
template <typename T, typename M>
- void dispatchResponse(T* _t, void (T::*method)(void* _ctx, unique_ptr<M>, NetworkErrorCode),
+ void dispatchResponse(T* _t, void (T::*method)(void* _ctx, pool_unique_ptr<M>, NetworkErrorCode),
IncomingPacket* _ipkt, NetworkErrorCode _code) {
void* ctx = nullptr;
- unique_ptr<M> m = nullptr;
+ pool_unique_ptr<M> m = nullptr;
NetworkErrorCode status = _code;
if (status == OK && _ipkt) {
REQID rid;
@@ -223,7 +223,7 @@
if (context_map_.find(rid) != context_map_.end()) {
// indeed
ctx = context_map_[rid].second;
- m = make_unique<M>();
+ m = make_unique_from_protobuf_pool<M>();
context_map_.erase(rid);
_ipkt->UnPackProtocolBuffer(m.get());
} else {
@@ -241,8 +241,8 @@
}
template <typename T, typename M>
- void dispatchMessage(T* _t, void (T::*method)(unique_ptr<M>), IncomingPacket* _ipkt) {
- unique_ptr<M> m = make_unique<M>();
+ void dispatchMessage(T* _t, void (T::*method)(pool_unique_ptr<M>), IncomingPacket* _ipkt) {
+ pool_unique_ptr<M> m = make_unique_from_protobuf_pool<M>();
if (_ipkt->UnPackProtocolBuffer(m.get()) != 0) {
// We could not decode the pb properly
diff --git a/heron/common/src/cpp/network/server.h b/heron/common/src/cpp/network/server.h
index 096263d..cf578a4 100644
--- a/heron/common/src/cpp/network/server.h
+++ b/heron/common/src/cpp/network/server.h
@@ -151,7 +151,7 @@
// Register a handler for a particular request type
template <typename T, typename M>
- void InstallRequestHandler(void (T::*method)(REQID id, Connection* conn, unique_ptr<M>)) {
+ void InstallRequestHandler(void (T::*method)(REQID id, Connection* conn, pool_unique_ptr<M>)) {
unique_ptr<google::protobuf::Message> m = make_unique<M>();
T* t = static_cast<T*>(this);
requestHandlers[m->GetTypeName()] = std::bind(&Server::dispatchRequest<T, M>, this, t, method,
@@ -160,7 +160,7 @@
// Register a handler for a particular message type
template <typename T, typename M>
- void InstallMessageHandler(void (T::*method)(Connection* conn, unique_ptr<M>)) {
+ void InstallMessageHandler(void (T::*method)(Connection* conn, pool_unique_ptr<M>)) {
unique_ptr<google::protobuf::Message> m = make_unique<M>();
T* t = static_cast<T*>(this);
messageHandlers[m->GetTypeName()] = std::bind(&Server::dispatchMessage<T, M>, this, t, method,
@@ -214,13 +214,13 @@
void InternalSendResponse(Connection* _connection, OutgoingPacket* _packet);
template <typename T, typename M>
- void dispatchRequest(T* _t, void (T::*method)(REQID id, Connection* conn, unique_ptr<M>),
+ void dispatchRequest(T* _t, void (T::*method)(REQID id, Connection* conn, pool_unique_ptr<M>),
Connection* _conn,
IncomingPacket* _ipkt) {
REQID rid;
CHECK(_ipkt->UnPackREQID(&rid) == 0) << "REQID unpacking failed";
- auto m = make_unique<M>();
+ auto m = make_unique_from_protobuf_pool<M>();
if (_ipkt->UnPackProtocolBuffer(m.get()) != 0) {
// We could not decode the pb properly
@@ -237,13 +237,13 @@
}
template <typename T, typename M>
- void dispatchMessage(T* _t, void (T::*method)(Connection* conn, unique_ptr<M>),
+ void dispatchMessage(T* _t, void (T::*method)(Connection* conn, pool_unique_ptr<M>),
Connection* _conn,
IncomingPacket* _ipkt) {
REQID rid;
CHECK(_ipkt->UnPackREQID(&rid) == 0) << "REQID unpacking failed";
- auto m = make_unique<M>();
+ auto m = make_unique_from_protobuf_pool<M>();
if (_ipkt->UnPackProtocolBuffer(m.get()) != 0) {
// We could not decode the pb properly
diff --git a/heron/common/tests/cpp/network/client_unittest.cpp b/heron/common/tests/cpp/network/client_unittest.cpp
index 4cfa101..b129fe0 100644
--- a/heron/common/tests/cpp/network/client_unittest.cpp
+++ b/heron/common/tests/cpp/network/client_unittest.cpp
@@ -62,7 +62,7 @@
void TestClient::HandleClose(NetworkErrorCode) {}
-void TestClient::HandleTestMessage(unique_ptr<TestMessage> _message) {
+void TestClient::HandleTestMessage(pool_unique_ptr<TestMessage> _message) {
++nrecv_;
if (nrecv_ >= ntotal_) {
diff --git a/heron/common/tests/cpp/network/client_unittest.h b/heron/common/tests/cpp/network/client_unittest.h
index 30d8fd6..666085e 100644
--- a/heron/common/tests/cpp/network/client_unittest.h
+++ b/heron/common/tests/cpp/network/client_unittest.h
@@ -44,7 +44,7 @@
private:
// Handle incoming message
- void HandleTestMessage(unique_ptr<TestMessage> _message);
+ void HandleTestMessage(pool_unique_ptr<TestMessage> _message);
void SendMessages();
void CreateAndSendMessage();
diff --git a/heron/common/tests/cpp/network/oclient_unittest.cpp b/heron/common/tests/cpp/network/oclient_unittest.cpp
index 8d6b3a5..4d56203 100644
--- a/heron/common/tests/cpp/network/oclient_unittest.cpp
+++ b/heron/common/tests/cpp/network/oclient_unittest.cpp
@@ -60,7 +60,7 @@
void OrderClient::HandleClose(NetworkErrorCode) {}
-void OrderClient::HandleOrderMessage(unique_ptr<OrderMessage> _message) {
+void OrderClient::HandleOrderMessage(pool_unique_ptr<OrderMessage> _message) {
++nrecv_;
EXPECT_EQ(msgidr_++, _message->id());
diff --git a/heron/common/tests/cpp/network/oclient_unittest.h b/heron/common/tests/cpp/network/oclient_unittest.h
index fa68bfc..d75e870 100644
--- a/heron/common/tests/cpp/network/oclient_unittest.h
+++ b/heron/common/tests/cpp/network/oclient_unittest.h
@@ -43,7 +43,7 @@
private:
// Handle incoming message
- void HandleOrderMessage(unique_ptr<OrderMessage> _message);
+ void HandleOrderMessage(pool_unique_ptr<OrderMessage> _message);
void SendMessages();
void CreateAndSendMessage();
diff --git a/heron/common/tests/cpp/network/oserver_unittest.cpp b/heron/common/tests/cpp/network/oserver_unittest.cpp
index 52ddcb1..bac150e 100644
--- a/heron/common/tests/cpp/network/oserver_unittest.cpp
+++ b/heron/common/tests/cpp/network/oserver_unittest.cpp
@@ -56,7 +56,7 @@
delete ids;
}
-void OrderServer::HandleOrderMessage(Connection* _conn, unique_ptr<OrderMessage> _message) {
+void OrderServer::HandleOrderMessage(Connection* _conn, pool_unique_ptr<OrderMessage> _message) {
if (clients_.find(_conn) == clients_.end()) return;
nrecv_++;
@@ -77,6 +77,6 @@
}
void OrderServer::HandleTerminateMessage(Connection* _connection __attribute__((unused)),
- unique_ptr<TerminateMessage> _message __attribute__((unused))) {
+ pool_unique_ptr<TerminateMessage> _message __attribute__((unused))) {
AddTimer([this]() { std::cout << "OrderServer:Terminate"; this->Terminate(); }, 1);
}
diff --git a/heron/common/tests/cpp/network/oserver_unittest.h b/heron/common/tests/cpp/network/oserver_unittest.h
index 1607cfe..ab34440 100644
--- a/heron/common/tests/cpp/network/oserver_unittest.h
+++ b/heron/common/tests/cpp/network/oserver_unittest.h
@@ -46,10 +46,11 @@
virtual void HandleConnectionClose(Connection* connection, NetworkErrorCode status);
// handle the test message
- virtual void HandleOrderMessage(Connection* connection, unique_ptr<OrderMessage> message);
+ virtual void HandleOrderMessage(Connection* connection, pool_unique_ptr<OrderMessage> message);
// handle the terminate message
- virtual void HandleTerminateMessage(Connection* connection, unique_ptr<TerminateMessage> message);
+ virtual void HandleTerminateMessage(Connection* connection,
+ pool_unique_ptr<TerminateMessage> message);
private:
void Terminate();
diff --git a/heron/common/tests/cpp/network/server_unittest.cpp b/heron/common/tests/cpp/network/server_unittest.cpp
index e150943..01b570b 100644
--- a/heron/common/tests/cpp/network/server_unittest.cpp
+++ b/heron/common/tests/cpp/network/server_unittest.cpp
@@ -55,7 +55,7 @@
}
void TestServer::HandleTestMessage(Connection* _connection __attribute__((unused)),
- unique_ptr<TestMessage> _message) {
+ pool_unique_ptr<TestMessage> _message) {
nrecv_++;
// find a random client to send the message to
@@ -73,6 +73,6 @@
}
void TestServer::HandleTerminateMessage(Connection* _connection __attribute__((unused)),
- unique_ptr<TerminateMessage> _message __attribute__((unused))) {
+ pool_unique_ptr<TerminateMessage> _message __attribute__((unused))) {
AddTimer([this]() { this->Terminate(); }, 1);
}
diff --git a/heron/common/tests/cpp/network/server_unittest.h b/heron/common/tests/cpp/network/server_unittest.h
index a629292..f8d4586 100644
--- a/heron/common/tests/cpp/network/server_unittest.h
+++ b/heron/common/tests/cpp/network/server_unittest.h
@@ -47,10 +47,11 @@
virtual void HandleConnectionClose(Connection* connection, NetworkErrorCode status);
// handle the test message
- virtual void HandleTestMessage(Connection* connection, unique_ptr<TestMessage> message);
+ virtual void HandleTestMessage(Connection* connection, pool_unique_ptr<TestMessage> message);
// handle the terminate message
- virtual void HandleTerminateMessage(Connection* connection, unique_ptr<TerminateMessage> message);
+ virtual void HandleTerminateMessage(Connection* connection,
+ pool_unique_ptr<TerminateMessage> message);
private:
void Terminate();
diff --git a/heron/instance/src/cpp/boltimpl/bolt-instance.cpp b/heron/instance/src/cpp/boltimpl/bolt-instance.cpp
index b0c96c2..37cba54 100644
--- a/heron/instance/src/cpp/boltimpl/bolt-instance.cpp
+++ b/heron/instance/src/cpp/boltimpl/bolt-instance.cpp
@@ -37,7 +37,7 @@
BoltInstance::BoltInstance(std::shared_ptr<EventLoop> eventLoop,
std::shared_ptr<TaskContextImpl> taskContext,
- NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
void* dllHandle)
: taskContext_(taskContext), dataToSlave_(dataToSlave),
@@ -114,7 +114,7 @@
metrics_->executeTuple(stream.id(), stream.component_name(), endTime - startTime);
}
-void BoltInstance::HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
+void BoltInstance::HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
if (tupleSet->has_control()) {
LOG(FATAL) << "Bolt cannot get incoming control tuples from other components";
}
diff --git a/heron/instance/src/cpp/boltimpl/bolt-instance.h b/heron/instance/src/cpp/boltimpl/bolt-instance.h
index 77f406c..31b949d 100644
--- a/heron/instance/src/cpp/boltimpl/bolt-instance.h
+++ b/heron/instance/src/cpp/boltimpl/bolt-instance.h
@@ -41,7 +41,7 @@
class BoltInstance : public InstanceBase {
public:
BoltInstance(std::shared_ptr<EventLoop> eventLoop, std::shared_ptr<TaskContextImpl> taskContext,
- NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
void* dllHandle);
virtual ~BoltInstance();
@@ -52,7 +52,7 @@
virtual void Deactivate();
virtual bool IsRunning() { return active_; }
virtual void DoWork();
- virtual void HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet);
+ virtual void HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet);
private:
void executeTuple(const proto::api::StreamId& stream,
@@ -62,7 +62,7 @@
const proto::system::HeronDataTuple& tup);
std::shared_ptr<TaskContextImpl> taskContext_;
- NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave_;
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave_;
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
std::shared_ptr<EventLoop> eventLoop_;
api::bolt::IBolt* bolt_;
diff --git a/heron/instance/src/cpp/gateway/gateway.cpp b/heron/instance/src/cpp/gateway/gateway.cpp
index 8c58c11..883cb6b 100644
--- a/heron/instance/src/cpp/gateway/gateway.cpp
+++ b/heron/instance/src/cpp/gateway/gateway.cpp
@@ -91,7 +91,7 @@
eventLoop_->loop();
}
-void Gateway::HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan) {
+void Gateway::HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan) {
LOG(INFO) << "Received a new physical plan from Stmgr";
if (config::TopologyConfigHelper::IsComponentSpout(pplan->topology(),
instanceProto_.info().component_name())) {
@@ -109,7 +109,7 @@
dataToSlave_->enqueue(std::move(pplan));
}
-void Gateway::HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> msg) {
+void Gateway::HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> msg) {
dataToSlave_->enqueue(std::move(msg));
if (dataToSlave_->size() > maxReadBufferSize_) {
stmgrClient_->putBackPressure();
diff --git a/heron/instance/src/cpp/gateway/gateway.h b/heron/instance/src/cpp/gateway/gateway.h
index b0c7c08..cfd4120 100644
--- a/heron/instance/src/cpp/gateway/gateway.h
+++ b/heron/instance/src/cpp/gateway/gateway.h
@@ -56,17 +56,18 @@
void HandleSlaveMetrics(google::protobuf::Message* msg);
std::shared_ptr<EventLoop> eventLoop() { return eventLoop_; }
- void setCommunicators(NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
- NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
- NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
+ void setCommunicators(
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
+ NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
+ NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
dataToSlave_ = dataToSlave;
dataFromSlave_ = dataFromSlave;
metricsFromSlave_ = metricsFromSlave;
}
private:
- void HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan);
- void HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> tuples);
+ void HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan);
+ void HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tuples);
void ResumeConsumingFromSlaveTimer();
std::string topologyName_;
std::string topologyId_;
@@ -76,7 +77,7 @@
std::shared_ptr<StMgrClient> stmgrClient_;
std::shared_ptr<common::MetricsMgrClient> metricsMgrClient_;
std::shared_ptr<GatewayMetrics> gatewayMetrics_;
- NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave_;
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave_;
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave_;
std::shared_ptr<EventLoop> eventLoop_;
diff --git a/heron/instance/src/cpp/gateway/stmgr-client.cpp b/heron/instance/src/cpp/gateway/stmgr-client.cpp
index 08d0259..5fe7adc 100644
--- a/heron/instance/src/cpp/gateway/stmgr-client.cpp
+++ b/heron/instance/src/cpp/gateway/stmgr-client.cpp
@@ -37,8 +37,8 @@
const std::string& topologyName, const std::string& topologyId,
const proto::system::Instance& instanceProto,
std::shared_ptr<GatewayMetrics> gatewayMetrics,
- std::function<void(std::unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher,
- std::function<void(std::unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher)
+ std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher,
+ std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher)
: Client(eventLoop, options),
topologyName_(topologyName),
topologyId_(topologyId),
@@ -100,7 +100,7 @@
void StMgrClient::HandleRegisterResponse(
void*,
- unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
+ pool_unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
NetworkErrorCode status) {
if (status != OK) {
LOG(ERROR) << "NonOK network code " << status << " for register response from stmgr "
@@ -125,7 +125,8 @@
if (response->has_pplan()) {
LOG(INFO) << "Registration response had a pplan";
- pplanWatcher_(std::move(unique_ptr<proto::system::PhysicalPlan>(response->release_pplan())));
+ using std::move;
+ pplanWatcher_(move(pool_unique_ptr<proto::system::PhysicalPlan>(response->release_pplan())));
}
}
@@ -140,13 +141,14 @@
return;
}
-void StMgrClient::HandlePhysicalPlan(unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg) {
+void StMgrClient::HandlePhysicalPlan(
+ pool_unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg) {
LOG(INFO) << "Got a Physical Plan from our stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port();
- pplanWatcher_(std::move(unique_ptr<proto::system::PhysicalPlan>(msg->release_pplan())));
+ pplanWatcher_(std::move(pool_unique_ptr<proto::system::PhysicalPlan>(msg->release_pplan())));
}
-void StMgrClient::HandleTupleMessage(unique_ptr<proto::system::HeronTupleSet2> msg) {
+void StMgrClient::HandleTupleMessage(pool_unique_ptr<proto::system::HeronTupleSet2> msg) {
gatewayMetrics_->updateReceivedPacketsCount(1);
gatewayMetrics_->updateReceivedPacketsSize(msg->ByteSize());
tupleWatcher_(std::move(msg));
diff --git a/heron/instance/src/cpp/gateway/stmgr-client.h b/heron/instance/src/cpp/gateway/stmgr-client.h
index 85c5a65..fac1169 100644
--- a/heron/instance/src/cpp/gateway/stmgr-client.h
+++ b/heron/instance/src/cpp/gateway/stmgr-client.h
@@ -38,8 +38,8 @@
const std::string& topologyName,
const std::string& topologyId, const proto::system::Instance& instance,
std::shared_ptr<GatewayMetrics> gatewayMetrics,
- std::function<void(std::unique_ptr<proto::system::PhysicalPlan>)> pplan_watcher,
- std::function<void(std::unique_ptr<proto::system::HeronTupleSet2>)> tuple_watcher);
+ std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplan_watcher,
+ std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tuple_watcher);
virtual ~StMgrClient();
void SendTupleMessage(const proto::system::HeronTupleSet& msg);
@@ -51,10 +51,11 @@
virtual void HandleClose(NetworkErrorCode status);
private:
- void HandleRegisterResponse(void*, unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
+ void HandleRegisterResponse(void*,
+ pool_unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
NetworkErrorCode status);
- void HandlePhysicalPlan(unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg);
- void HandleTupleMessage(unique_ptr<proto::system::HeronTupleSet2> tupleMessage);
+ void HandlePhysicalPlan(pool_unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg);
+ void HandleTupleMessage(pool_unique_ptr<proto::system::HeronTupleSet2> tupleMessage);
void OnReconnectTimer();
void SendRegisterRequest();
@@ -63,8 +64,8 @@
std::string topologyId_;
const proto::system::Instance& instanceProto_;
std::shared_ptr<GatewayMetrics> gatewayMetrics_;
- std::function<void(std::unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher_;
- std::function<void(std::unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher_;
+ std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher_;
+ std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher_;
int64_t ndropped_messages_;
int reconnect_interval_;
int max_reconnect_times_;
diff --git a/heron/instance/src/cpp/instance-main.cpp b/heron/instance/src/cpp/instance-main.cpp
index ac73723..d81897f 100644
--- a/heron/instance/src/cpp/instance-main.cpp
+++ b/heron/instance/src/cpp/instance-main.cpp
@@ -64,7 +64,7 @@
auto slave = new heron::instance::Slave(FLAGS_task_id, FLAGS_topology_binary);
auto dataToSlave =
- new heron::instance::NotifyingCommunicator<unique_ptr<google::protobuf::Message>>(
+ new heron::instance::NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>(
slave->eventLoop(),
std::bind(&heron::instance::Slave::HandleGatewayData,
slave, std::placeholders::_1),
diff --git a/heron/instance/src/cpp/slave/instance-base.h b/heron/instance/src/cpp/slave/instance-base.h
index 45dc29c..a10398f 100644
--- a/heron/instance/src/cpp/slave/instance-base.h
+++ b/heron/instance/src/cpp/slave/instance-base.h
@@ -36,7 +36,7 @@
virtual void Deactivate() = 0;
virtual bool IsRunning() = 0;
virtual void DoWork() = 0;
- virtual void HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) = 0;
+ virtual void HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) = 0;
};
} // namespace instance
diff --git a/heron/instance/src/cpp/slave/slave.cpp b/heron/instance/src/cpp/slave/slave.cpp
index 46312a4..f50e201 100644
--- a/heron/instance/src/cpp/slave/slave.cpp
+++ b/heron/instance/src/cpp/slave/slave.cpp
@@ -58,9 +58,9 @@
}
void Slave::setCommunicators(
- NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
- NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
- NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
+ NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
+ NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave) {
dataToSlave_ = dataToSlave;
dataFromSlave_ = dataFromSlave;
metricsFromSlave_ = metricsFromSlave;
@@ -80,20 +80,20 @@
eventLoop_->loop();
}
-void Slave::HandleGatewayData(unique_ptr<google::protobuf::Message> msg) {
+void Slave::HandleGatewayData(pool_unique_ptr<google::protobuf::Message> msg) {
if (msg->GetTypeName() == pplan_typename_) {
LOG(INFO) << "Slave Received a new pplan message from Gateway";
- auto pplan = unique_ptr<proto::system::PhysicalPlan>(
+ auto pplan = pool_unique_ptr<proto::system::PhysicalPlan>(
static_cast<proto::system::PhysicalPlan*>(msg.release()));
HandleNewPhysicalPlan(std::move(pplan));
} else {
- auto tupleSet = unique_ptr<proto::system::HeronTupleSet2>(
+ auto tupleSet = pool_unique_ptr<proto::system::HeronTupleSet2>(
static_cast<proto::system::HeronTupleSet2*>(msg.release()));
HandleStMgrTuples(std::move(tupleSet));
}
}
-void Slave::HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan) {
+void Slave::HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan) {
std::shared_ptr<proto::system::PhysicalPlan> newPplan = std::move(pplan);
taskContext_->newPhysicalPlan(newPplan);
if (!instance_) {
@@ -125,7 +125,7 @@
}
}
-void Slave::HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
+void Slave::HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
if (instance_) {
instance_->HandleGatewayTuples(std::move(tupleSet));
} else {
diff --git a/heron/instance/src/cpp/slave/slave.h b/heron/instance/src/cpp/slave/slave.h
index 377e0c9..3e5f946 100644
--- a/heron/instance/src/cpp/slave/slave.h
+++ b/heron/instance/src/cpp/slave/slave.h
@@ -42,12 +42,13 @@
void Start();
std::shared_ptr<EventLoop> eventLoop() { return eventLoop_; }
- void setCommunicators(NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave,
- NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
- NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave);
+ void setCommunicators(
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave,
+ NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave,
+ NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave);
// Handles data from gateway thread
- void HandleGatewayData(unique_ptr<google::protobuf::Message> msg);
+ void HandleGatewayData(pool_unique_ptr<google::protobuf::Message> msg);
// This is the notification that gateway thread consumed something that we wrote
void HandleGatewayDataConsumed();
@@ -59,13 +60,13 @@
// This is the one thats running in the slave thread
void InternalStart();
// Called when a new phyiscal plan is received
- void HandleNewPhysicalPlan(unique_ptr<proto::system::PhysicalPlan> pplan);
+ void HandleNewPhysicalPlan(pool_unique_ptr<proto::system::PhysicalPlan> pplan);
// Called when we receive new tuple messages from gateway
- void HandleStMgrTuples(unique_ptr<proto::system::HeronTupleSet2> msg);
+ void HandleStMgrTuples(pool_unique_ptr<proto::system::HeronTupleSet2> msg);
int myTaskId_;
std::shared_ptr<TaskContextImpl> taskContext_;
- NotifyingCommunicator<unique_ptr<google::protobuf::Message>>* dataToSlave_;
+ NotifyingCommunicator<pool_unique_ptr<google::protobuf::Message>>* dataToSlave_;
NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
NotifyingCommunicator<google::protobuf::Message*>* metricsFromSlave_;
InstanceBase* instance_;
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
index 7ad7167..0374794 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
@@ -178,7 +178,7 @@
collector_->numInFlight() < maxSpoutPending));
}
-void SpoutInstance::HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
+void SpoutInstance::HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet) {
if (tupleSet->has_data()) {
LOG(FATAL) << "Spout cannot get incoming data tuples from other components";
}
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.h b/heron/instance/src/cpp/spoutimpl/spout-instance.h
index 36d328f..a93ec14 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.h
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.h
@@ -51,7 +51,7 @@
virtual void Deactivate();
virtual bool IsRunning() { return active_; }
virtual void DoWork();
- virtual void HandleGatewayTuples(unique_ptr<proto::system::HeronTupleSet2> tupleSet);
+ virtual void HandleGatewayTuples(pool_unique_ptr<proto::system::HeronTupleSet2> tupleSet);
private:
void lookForTimeouts();
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
index 821a671..106abc3 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
@@ -42,7 +42,7 @@
shared_ptr<common::MetricsMgrSt> const& _metrics_manager_client,
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _tupleset_drainer,
std::function<void(proto::stmgr::TupleStreamMessage*)> _tuplestream_drainer,
- std::function<void(sp_int32, std::unique_ptr<InitiateStatefulCheckpoint>)> _ckpt_drainer)
+ std::function<void(sp_int32, pool_unique_ptr<InitiateStatefulCheckpoint>)> _ckpt_drainer)
: drain_threshold_(_drain_threshold), current_size_(0),
neighbour_calculator_(_neighbour_calculator),
metrics_manager_client_(_metrics_manager_client), tupleset_drainer_(_tupleset_drainer),
@@ -72,7 +72,7 @@
size_metric_->SetValue(current_size_);
}
-void CheckpointGateway::SendToInstance(unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+void CheckpointGateway::SendToInstance(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
if (current_size_ > drain_threshold_) {
ForceDrain();
}
@@ -176,8 +176,8 @@
return _tuple;
} else {
auto tp = std::make_tuple(_tuple,
- (proto::stmgr::TupleStreamMessage*)nullptr,
- (unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
+ (proto::stmgr::TupleStreamMessage*)nullptr,
+ (pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
add(tp, _size);
return nullptr;
}
@@ -196,8 +196,8 @@
return _tuple;
} else {
auto tp = std::make_tuple((proto::system::HeronTupleSet2*)nullptr,
- _tuple,
- (unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
+ _tuple,
+ (pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>)nullptr);
add(tp, _size);
return nullptr;
}
@@ -235,7 +235,7 @@
<< " All checkpoint markers received for checkpoint "
<< _checkpoint_id;
// We need to add Initiate Checkpoint message before the current set
- auto message = make_unique<proto::ckptmgr::InitiateStatefulCheckpoint>();
+ auto message = make_unique_from_protobuf_pool<proto::ckptmgr::InitiateStatefulCheckpoint>();
message->set_checkpoint_id(_checkpoint_id);
int cache_size = message->GetCachedSize();
auto new_tuple = std::make_tuple(
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
index 0d17d15..2129832 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
@@ -57,10 +57,10 @@
shared_ptr<common::MetricsMgrSt> const& _metrics_manager_client,
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tupleset_drainer,
std::function<void(proto::stmgr::TupleStreamMessage*)> tuplestream_drainer,
- std::function<void(sp_int32, unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer);
+ std::function<void(sp_int32, pool_unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer);
virtual ~CheckpointGateway();
void SendToInstance(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
- void SendToInstance(unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+ void SendToInstance(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
void HandleUpstreamMarker(sp_int32 _src_task_id, sp_int32 _destination_task_id,
const sp_string& _checkpoint_id);
@@ -70,7 +70,7 @@
private:
typedef std::tuple<proto::system::HeronTupleSet2*,
proto::stmgr::TupleStreamMessage*,
- unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>>
+ pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint>>
Tuple;
// This helper class defines the current state of affairs
@@ -111,7 +111,7 @@
std::unordered_map<sp_int32, unique_ptr<CheckpointInfo>> pending_tuples_;
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tupleset_drainer_;
std::function<void(proto::stmgr::TupleStreamMessage*)> tuplestream_drainer_;
- std::function<void(sp_int32, unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer_;
+ std::function<void(sp_int32, pool_unique_ptr<InitiateStatefulCheckpoint>)> ckpt_drainer_;
};
} // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index a22905e..332bc88 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -116,9 +116,9 @@
}
void CkptMgrClient::HandleRegisterStMgrResponse(
- void*,
- unique_ptr<proto::ckptmgr::RegisterStMgrResponse> _response,
- NetworkErrorCode _status) {
+ void*,
+ pool_unique_ptr<proto::ckptmgr::RegisterStMgrResponse> _response,
+ NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK network code " << _status << " for register response from ckptmgr "
<< ckptmgr_id_ << "running at " << get_clientoptions().get_host() << ":"
@@ -182,7 +182,7 @@
void CkptMgrClient::HandleSaveInstanceStateResponse(
void*,
- unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
+ pool_unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK response message for SaveInstanceStateResponse";
@@ -200,7 +200,7 @@
void CkptMgrClient::HandleGetInstanceStateResponse(
void* _ctx,
- unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
+ pool_unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
NetworkErrorCode _status) {
int32_t* nattempts = static_cast<int32_t*>(_ctx);
if (_status != OK) {
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index ad3871a..6046221 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -55,10 +55,10 @@
void GetInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id, int32_t* _nattempts);
virtual void HandleSaveInstanceStateResponse(void*,
- unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
+ pool_unique_ptr<proto::ckptmgr::SaveInstanceStateResponse> _response,
NetworkErrorCode status);
virtual void HandleGetInstanceStateResponse(void*,
- unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
+ pool_unique_ptr<proto::ckptmgr::GetInstanceStateResponse> _response,
NetworkErrorCode status);
virtual void HandleConnect(NetworkErrorCode status);
virtual void HandleClose(NetworkErrorCode status);
@@ -66,7 +66,7 @@
private:
void HandleRegisterStMgrResponse(
void *,
- unique_ptr<proto::ckptmgr::RegisterStMgrResponse >_response,
+ pool_unique_ptr<proto::ckptmgr::RegisterStMgrResponse >_response,
NetworkErrorCode);
void SendRegisterRequest();
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index b2ef29b..6c0f973 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -287,7 +287,7 @@
}
void InstanceServer::HandleRegisterInstanceRequest(REQID _reqid, Connection* _conn,
- unique_ptr<proto::stmgr::RegisterInstanceRequest> _request) {
+ pool_unique_ptr<proto::stmgr::RegisterInstanceRequest> _request) {
LOG(INFO) << "Got HandleRegisterInstanceRequest from connection " << _conn << " and instance "
<< _request->instance().instance_id();
// Do some basic checks
@@ -377,7 +377,7 @@
}
void InstanceServer::HandleTupleSetMessage(Connection* _conn,
- unique_ptr<proto::system::HeronTupleSet> _message) {
+ pool_unique_ptr<proto::system::HeronTupleSet> _message) {
auto iter = active_instances_.find(_conn);
if (iter == active_instances_.end()) {
LOG(ERROR) << "Received TupleSet from unknown instance connection. Dropping..";
@@ -398,7 +398,7 @@
std::move(_message));
}
-void InstanceServer::SendToInstance2(unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+void InstanceServer::SendToInstance2(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
sp_string instance_id = task_id_to_name[_message->task_id()];
ConnectionBufferLengthMetricMap::const_iterator it =
connection_buffer_length_metric_map_.find(instance_id);
@@ -469,7 +469,7 @@
}
void InstanceServer::DrainCheckpoint(sp_int32 _task_id,
- unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message) {
+ pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message) {
TaskIdInstanceDataMap::iterator iter = instance_info_.find(_task_id);
if (iter == instance_info_.end() || iter->second->conn_ == NULL) {
LOG(ERROR) << "task_id " << _task_id << " has not yet connected to us. Dropping...";
@@ -647,7 +647,7 @@
}
void InstanceServer::HandleStoreInstanceStateCheckpointMessage(Connection* _conn,
- unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message) {
+ pool_unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message) {
ConnectionTaskIdMap::iterator iter = active_instances_.find(_conn);
if (iter == active_instances_.end()) {
LOG(ERROR) << "Hmm.. Got InstaceStateCheckpoint Message from an unknown connection";
@@ -667,7 +667,7 @@
}
void InstanceServer::HandleRestoreInstanceStateResponse(Connection* _conn,
- unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message) {
+ pool_unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message) {
ConnectionTaskIdMap::iterator iter = active_instances_.find(_conn);
if (iter == active_instances_.end()) {
LOG(ERROR) << "Hmm.. Got RestoreInstanceStateResponse Message from an unknown connection";
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index e7037fb..df4b5e9 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -63,7 +63,7 @@
// We own the message
void SendToInstance2(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
- void SendToInstance2(unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+ void SendToInstance2(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
// When we get a checkpoint marker from _src_task_id destined for _destination_task_id
// this function in invoked, so that we might register it in gateway
@@ -107,7 +107,7 @@
void DrainTupleSet(sp_int32 _task_id, proto::system::HeronTupleSet2* _message);
void DrainTupleStream(proto::stmgr::TupleStreamMessage* _message);
void DrainCheckpoint(sp_int32 _task_id,
- unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message);
+ pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message);
sp_string MakeBackPressureCompIdMetricName(const sp_string& instanceid);
sp_string MakeQueueSizeCompIdMetricName(const sp_string& instanceid);
sp_string MakeQueueLengthCompIdMetricName(const sp_string& instanceid);
@@ -118,12 +118,13 @@
// Next from local instances
void HandleRegisterInstanceRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::stmgr::RegisterInstanceRequest> _request);
- void HandleTupleSetMessage(Connection* _conn, unique_ptr<proto::system::HeronTupleSet> _message);
+ pool_unique_ptr<proto::stmgr::RegisterInstanceRequest> _request);
+ void HandleTupleSetMessage(Connection* _conn,
+ pool_unique_ptr<proto::system::HeronTupleSet> _message);
void HandleStoreInstanceStateCheckpointMessage(Connection* _conn,
- unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message);
+ pool_unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message);
void HandleRestoreInstanceStateResponse(Connection* _conn,
- unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message);
+ pool_unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message);
// Back pressure related connection callbacks
// Do back pressure
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 9186dc3..0772bb9 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -143,9 +143,9 @@
}
void StMgrClient::HandleHelloResponse(
- void*,
- unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
- NetworkErrorCode _status) {
+ void*,
+ pool_unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
+ NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK network code " << _status << " for register response from stmgr "
<< other_stmgr_id_ << " running at " << get_clientoptions().get_host() << ":"
@@ -239,7 +239,8 @@
return retval;
}
-void StMgrClient::HandleTupleStreamMessage(unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+void StMgrClient::HandleTupleStreamMessage(
+ pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
LOG(FATAL) << "We should not receive tuple messages in the client" << std::endl;
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.h b/heron/stmgr/src/cpp/manager/stmgr-client.h
index acaa64b..d3cba96 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.h
@@ -65,9 +65,9 @@
private:
void HandleHelloResponse(
void*,
- unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
+ pool_unique_ptr<proto::stmgr::StrMgrHelloResponse> _response,
NetworkErrorCode);
- void HandleTupleStreamMessage(unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+ void HandleTupleStreamMessage(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
void OnReConnectTimer();
void SendHelloRequest();
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 4b52c89..12e573e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -125,7 +125,7 @@
}
void StMgrServer::HandleStMgrHelloRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::stmgr::StrMgrHelloRequest> _request) {
+ pool_unique_ptr<proto::stmgr::StrMgrHelloRequest> _request) {
LOG(INFO) << "Got a hello message from stmgr " << _request->stmgr() << " on connection " << _conn;
proto::stmgr::StrMgrHelloResponse response;
// Some basic checks
@@ -155,7 +155,7 @@
}
void StMgrServer::HandleTupleStreamMessage(Connection* _conn,
- unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+ pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
auto iter = rstmgrs_.find(_conn);
if (iter == rstmgrs_.end()) {
LOG(INFO) << "Recieved Tuple messages from unknown streammanager connection";
@@ -204,7 +204,7 @@
}
void StMgrServer::HandleStartBackPressureMessage(Connection* _conn,
- unique_ptr<proto::stmgr::StartBackPressureMessage> _message) {
+ pool_unique_ptr<proto::stmgr::StartBackPressureMessage> _message) {
// Close spouts
LOG(INFO) << "Received start back pressure from str mgr " << _message->stmgr();
if (_message->topology_name() != topology_name_ || _message->topology_id() != topology_id_) {
@@ -222,7 +222,7 @@
}
void StMgrServer::HandleStopBackPressureMessage(Connection* _conn,
- unique_ptr<proto::stmgr::StopBackPressureMessage> _message) {
+ pool_unique_ptr<proto::stmgr::StopBackPressureMessage> _message) {
LOG(INFO) << "Received stop back pressure from str mgr " << _message->stmgr();
if (_message->topology_name() != topology_name_ || _message->topology_id() != topology_id_) {
LOG(ERROR) << "Received stop back pressure message from unknown stream manager "
@@ -246,7 +246,7 @@
}
void StMgrServer::HandleDownstreamStatefulCheckpointMessage(Connection* _conn,
- unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message) {
+ pool_unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message) {
stmgr_->HandleDownStreamStatefulCheckpoint(*_message);
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 13b934e..325f0c1 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -70,19 +70,19 @@
// First from other stream managers
void HandleStMgrHelloRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::stmgr::StrMgrHelloRequest> _request);
+ pool_unique_ptr<proto::stmgr::StrMgrHelloRequest> _request);
void HandleTupleStreamMessage(Connection* _conn,
- unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+ pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
// Handler for DownstreamStatefulCheckpoint from a peer stmgr
void HandleDownstreamStatefulCheckpointMessage(Connection* _conn,
- unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message);
+ pool_unique_ptr<proto::ckptmgr::DownstreamStatefulCheckpoint> _message);
// Backpressure message from and to other stream managers
void HandleStartBackPressureMessage(Connection* _conn,
- unique_ptr<proto::stmgr::StartBackPressureMessage> _message);
+ pool_unique_ptr<proto::stmgr::StartBackPressureMessage> _message);
void HandleStopBackPressureMessage(Connection* _conn,
- unique_ptr<proto::stmgr::StopBackPressureMessage> _message);
+ pool_unique_ptr<proto::stmgr::StopBackPressureMessage> _message);
// map from stmgr_id to their connection
typedef std::unordered_map<sp_string, Connection*> StreamManagerConnectionMap;
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 056c4ec..6510271 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -675,7 +675,7 @@
const shared_ptr<proto::system::PhysicalPlan> StMgr::GetPhysicalPlan() const { return pplan_; }
void StMgr::HandleStreamManagerData(const sp_string&,
- unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
+ pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
if (stateful_restorer_ && stateful_restorer_->InProgress()) {
LOG(INFO) << "Dropping data received from stmgr because we are in Restore";
dropped_during_restore_metrics_->scope(RESTORE_DROPPED_STMGR_BYTES)
@@ -769,7 +769,7 @@
// Called when local tasks generate data
void StMgr::HandleInstanceData(const sp_int32 _src_task_id, bool _local_spout,
- unique_ptr<proto::system::HeronTupleSet> _message) {
+ pool_unique_ptr<proto::system::HeronTupleSet> _message) {
instance_bytes_received_metrics_->scope(std::to_string(_src_task_id))
->incr_by(_message->ByteSize());
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index c158792..decdc39 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -79,9 +79,9 @@
// Called by tmaster client when a new physical plan is available
void NewPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> pplan);
void HandleStreamManagerData(const sp_string& _stmgr_id,
- unique_ptr<proto::stmgr::TupleStreamMessage> _message);
+ pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message);
void HandleInstanceData(sp_int32 _task_id, bool _local_spout,
- unique_ptr<proto::system::HeronTupleSet> _message);
+ pool_unique_ptr<proto::system::HeronTupleSet> _message);
// Called when an instance does checkpoint and sends its checkpoint
// to the stmgr to save it
void HandleStoreInstanceStateCheckpoint(
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index bec20d7..f1e27d8 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -146,9 +146,9 @@
}
void TMasterClient::HandleRegisterResponse(
- void*,
- unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
- NetworkErrorCode _status) {
+ void*,
+ pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
+ NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "non ok network stack code for Register Response from Tmaster" << std::endl;
Stop();
@@ -173,9 +173,9 @@
}
void TMasterClient::HandleHeartbeatResponse(
- void*,
- unique_ptr<proto::tmaster::StMgrHeartbeatResponse> _response,
- NetworkErrorCode _status) {
+ void*,
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> _response,
+ NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK response message for heartbeat Response" << std::endl;
Stop();
@@ -196,13 +196,13 @@
}
void TMasterClient::HandleNewAssignmentMessage(
- unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message) {
+ pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message) {
LOG(INFO) << "Got a new assignment" << std::endl;
pplan_watch_(shared_ptr<proto::system::PhysicalPlan>(_message->release_new_pplan()));
}
void TMasterClient::HandleStatefulCheckpointMessage(
- unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message) {
+ pool_unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message) {
LOG(INFO) << "Got a new start stateful checkpoint message from tmaster with id "
<< _message->checkpoint_id();
stateful_checkpoint_watch_(_message->checkpoint_id());
@@ -294,12 +294,12 @@
}
void TMasterClient::HandleRestoreTopologyStateRequest(
- unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message) {
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message) {
restore_topology_watch_(_message->checkpoint_id(), _message->restore_txid());
}
void TMasterClient::HandleStartStmgrStatefulProcessing(
- unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _message) {
+ pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _message) {
start_stateful_watch_(_message->checkpoint_id());
}
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.h b/heron/stmgr/src/cpp/manager/tmaster-client.h
index 3d25395..ec83406 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.h
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.h
@@ -73,18 +73,20 @@
virtual void HandleClose(NetworkErrorCode status);
private:
- void HandleRegisterResponse(void*, unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
+ void HandleRegisterResponse(void*,
+ pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> _response,
NetworkErrorCode);
- void HandleHeartbeatResponse(void*, unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+ void HandleHeartbeatResponse(void*,
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
NetworkErrorCode);
- void HandleNewAssignmentMessage(unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message);
+ void HandleNewAssignmentMessage(pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> _message);
void HandleStatefulCheckpointMessage(
- unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message);
+ pool_unique_ptr<proto::ckptmgr::StartStatefulCheckpoint> _message);
void HandleRestoreTopologyStateRequest(
- unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message);
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _message);
void HandleStartStmgrStatefulProcessing(
- unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
+ pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
void OnReConnectTimer();
void OnHeartbeatTimer();
diff --git a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
index 9f058fb..51d8efd 100644
--- a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
@@ -212,7 +212,7 @@
}
void drainer3(sp_int32 _task_id,
- std::unique_ptr<heron::proto::ckptmgr::InitiateStatefulCheckpoint> _ckpt) {
+ pool_unique_ptr<heron::proto::ckptmgr::InitiateStatefulCheckpoint> _ckpt) {
drainer3_markers.push_back(_task_id);
}
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.cpp b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
index 75bf608..5a61900 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
@@ -76,9 +76,9 @@
}
void DummyInstance::HandleInstanceResponse(
- void*,
- unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
- NetworkErrorCode status) {
+ void*,
+ pool_unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
+ NetworkErrorCode status) {
CHECK_EQ(status, OK);
if (_message->has_pplan()) {
if (recvd_stmgr_pplan_) {
@@ -92,10 +92,10 @@
register_response_status = _message->status().status();
}
-void DummyInstance::HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2>) {}
+void DummyInstance::HandleTupleMessage(pool_unique_ptr<heron::proto::system::HeronTupleSet2>) {}
void DummyInstance::HandleNewInstanceAssignmentMsg(
- std::unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>) {}
+ pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>) {}
void DummyInstance::CreateAndSendInstanceRequest() {
auto request = make_unique<heron::proto::stmgr::RegisterInstanceRequest>();
@@ -130,8 +130,7 @@
under_backpressure_(false) {}
void DummySpoutInstance::HandleNewInstanceAssignmentMsg(
- unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
-
+ pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
const heron::proto::system::PhysicalPlan pplan = _msg->pplan();
DummyInstance::HandleNewInstanceAssignmentMsg(std::move(_msg));
@@ -188,13 +187,14 @@
expected_msgs_to_recv_(_expected_msgs_to_recv),
msgs_recvd_(0) {}
-void DummyBoltInstance::HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2> msg) {
+void DummyBoltInstance::HandleTupleMessage(
+ pool_unique_ptr<heron::proto::system::HeronTupleSet2> msg) {
if (msg->has_data()) msgs_recvd_ += msg->mutable_data()->tuples_size();
if (msgs_recvd_ >= expected_msgs_to_recv_) getEventLoop()->loopExit();
}
void DummyBoltInstance::HandleNewInstanceAssignmentMsg(
- unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
+ pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg) {
DummyInstance::HandleNewInstanceAssignmentMsg(std::move(_msg));
if (expected_msgs_to_recv_ == 0) {
getEventLoop()->loopExit();
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.h b/heron/stmgr/tests/cpp/server/dummy_instance.h
index e603da6..f06a1b4 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.h
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.h
@@ -40,14 +40,14 @@
// Handle incoming message
virtual void HandleInstanceResponse(
- void* ctx,
- unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
- NetworkErrorCode status);
+ void* ctx,
+ pool_unique_ptr<heron::proto::stmgr::RegisterInstanceResponse> _message,
+ NetworkErrorCode status);
// Handle incoming tuples
- virtual void HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2> _message);
+ virtual void HandleTupleMessage(pool_unique_ptr<heron::proto::system::HeronTupleSet2> _message);
// Handle the instance assignment message
virtual void HandleNewInstanceAssignmentMsg(
- std::unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>);
+ pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage>);
sp_string topology_name_;
sp_string topology_id_;
@@ -82,7 +82,7 @@
protected:
// Handle incoming message
virtual void HandleNewInstanceAssignmentMsg(
- unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
+ pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
void CreateAndSendTupleMessages();
virtual void StartBackPressureConnectionCb(Connection* connection) {
under_backpressure_ = true;
@@ -115,9 +115,9 @@
protected:
// Handle incoming message
// Handle incoming tuples
- virtual void HandleTupleMessage(unique_ptr<heron::proto::system::HeronTupleSet2> _message);
+ virtual void HandleTupleMessage(pool_unique_ptr<heron::proto::system::HeronTupleSet2> _message);
virtual void HandleNewInstanceAssignmentMsg(
- unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
+ pool_unique_ptr<heron::proto::stmgr::NewInstanceAssignmentMessage> _msg);
private:
sp_int32 expected_msgs_to_recv_;
diff --git a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp
index 29ccc75..4bd2b50 100644
--- a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.cpp
@@ -57,7 +57,7 @@
}
void DummyMtrMgr::HandleMetricPublisherRegisterRequest(REQID id, Connection* conn,
- unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> request) {
+ pool_unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> request) {
LOG(INFO) << "Got a register request ";
heron::proto::system::MetricPublisherRegisterResponse response;
response.mutable_status()->set_status(heron::proto::system::OK);
@@ -65,10 +65,10 @@
}
void DummyMtrMgr::HandleMetricPublisherPublishMessage(
- Connection*, unique_ptr<heron::proto::system::MetricPublisherPublishMessage> message) {}
+ Connection*, pool_unique_ptr<heron::proto::system::MetricPublisherPublishMessage> message) {}
void DummyMtrMgr::HandleTMasterLocationMessage(
- Connection*, unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> message) {
+ Connection*, pool_unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> message) {
location_ = message->release_tmaster();
LOG(INFO) << "Got tmaster location: " << location_->host() << ":" << location_->master_port();
diff --git a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h
index 3b773f6..17710ef 100644
--- a/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h
+++ b/heron/stmgr/tests/cpp/server/dummy_metricsmgr.h
@@ -49,11 +49,12 @@
// Handle metrics publisher request
virtual void HandleMetricPublisherRegisterRequest(REQID _id, Connection* _conn,
- unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> _request);
+ pool_unique_ptr<heron::proto::system::MetricPublisherRegisterRequest> _request);
virtual void HandleMetricPublisherPublishMessage(
- Connection* _conn, unique_ptr<heron::proto::system::MetricPublisherPublishMessage> _message);
+ Connection* _conn,
+ pool_unique_ptr<heron::proto::system::MetricPublisherPublishMessage> _message);
virtual void HandleTMasterLocationMessage(
- Connection*, unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> _message);
+ Connection*, pool_unique_ptr<heron::proto::system::TMasterLocationRefreshMessage> _message);
private:
sp_string stmgr_id_expected_;
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
index d4072a4..5f94fa4 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
@@ -48,7 +48,9 @@
DummyTMasterClient::~DummyTMasterClient() {}
void DummyTMasterClient::HandleRegisterResponse(
- void*, unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response, NetworkErrorCode) {
+ void*,
+ pool_unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
+ NetworkErrorCode) {
}
void DummyTMasterClient::HandleConnect(NetworkErrorCode _status) {
@@ -119,7 +121,7 @@
void DummyStMgr::HandleConnectionClose(Connection*, NetworkErrorCode) {}
void DummyStMgr::HandleStMgrHelloRequest(REQID _id, Connection* _conn,
- unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request) {
+ pool_unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request) {
other_stmgrs_ids_.push_back(_request->stmgr());
heron::proto::stmgr::StrMgrHelloResponse response;
response.mutable_status()->set_status(heron::proto::system::OK);
@@ -127,11 +129,11 @@
}
void DummyStMgr::HandleStartBackPressureMessage(Connection*,
- unique_ptr<heron::proto::stmgr::StartBackPressureMessage>) {
+ pool_unique_ptr<heron::proto::stmgr::StartBackPressureMessage>) {
++num_start_bp_;
}
void DummyStMgr::HandleStopBackPressureMessage(Connection*,
- unique_ptr<heron::proto::stmgr::StopBackPressureMessage>) {
+ pool_unique_ptr<heron::proto::stmgr::StopBackPressureMessage>) {
++num_stop_bp_;
}
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.h b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
index 5170139..bd0342b 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.h
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
@@ -42,9 +42,9 @@
// Handle connection close
virtual void HandleClose(NetworkErrorCode _status);
virtual void HandleRegisterResponse(
- void*,
- unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
- NetworkErrorCode);
+ void*,
+ pool_unique_ptr<heron::proto::tmaster::StMgrRegisterResponse> response,
+ NetworkErrorCode);
// Send worker request
void CreateAndSendRegisterRequest();
@@ -80,11 +80,11 @@
// Handle st mgr hello message
virtual void HandleStMgrHelloRequest(REQID _id, Connection* _conn,
- unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request);
+ pool_unique_ptr<heron::proto::stmgr::StrMgrHelloRequest> _request);
virtual void HandleStartBackPressureMessage(Connection*,
- unique_ptr<heron::proto::stmgr::StartBackPressureMessage>);
+ pool_unique_ptr<heron::proto::stmgr::StartBackPressureMessage>);
virtual void HandleStopBackPressureMessage(Connection*,
- unique_ptr<heron::proto::stmgr::StopBackPressureMessage>);
+ pool_unique_ptr<heron::proto::stmgr::StopBackPressureMessage>);
private:
std::vector<sp_string> other_stmgrs_ids_;
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
index e365cb6..eb4ac40 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
@@ -99,9 +99,9 @@
}
void CkptMgrClient::HandleTMasterRegisterResponse(
- void*,
- unique_ptr<proto::ckptmgr::RegisterTMasterResponse> _response,
- NetworkErrorCode _status) {
+ void*,
+ pool_unique_ptr<proto::ckptmgr::RegisterTMasterResponse> _response,
+ NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK network code" << _status << " for register response from ckptmgr "
<< "running at " << get_clientoptions().get_host() << ":"
@@ -145,9 +145,9 @@
}
void CkptMgrClient::HandleCleanStatefulCheckpointResponse(
- void*,
- unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
- NetworkErrorCode status) {
+ void*,
+ pool_unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
+ NetworkErrorCode status) {
LOG(INFO) << "Got CleanStatefulCheckpoint response from ckptmgr" << std::endl;
proto::system::StatusCode code = proto::system::OK;
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.h b/heron/tmaster/src/cpp/manager/ckptmgr-client.h
index 7902f43..8c187aa 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.h
+++ b/heron/tmaster/src/cpp/manager/ckptmgr-client.h
@@ -42,17 +42,17 @@
protected:
virtual void HandleCleanStatefulCheckpointResponse(
- void*,
- unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
- NetworkErrorCode status);
+ void*,
+ pool_unique_ptr<proto::ckptmgr::CleanStatefulCheckpointResponse> _response,
+ NetworkErrorCode status);
virtual void HandleConnect(NetworkErrorCode status);
virtual void HandleClose(NetworkErrorCode status);
private:
void HandleTMasterRegisterResponse(
- void*,
- unique_ptr<proto::ckptmgr::RegisterTMasterResponse>_response,
- NetworkErrorCode _status);
+ void*,
+ pool_unique_ptr<proto::ckptmgr::RegisterTMasterResponse>_response,
+ NetworkErrorCode _status);
void SendRegisterRequest();
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.cpp b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
index f35be46..82096dd 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.cpp
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
@@ -67,38 +67,38 @@
}
void TMasterServer::HandleStMgrRegisterRequest(REQID _reqid, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
+ pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
unique_ptr<StMgrRegisterProcessor> processor =
make_unique<StMgrRegisterProcessor>(_reqid, _conn, std::move(_request), tmaster_, this);
processor->Start();
}
void TMasterServer::HandleStMgrHeartbeatRequest(REQID _reqid, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request) {
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request) {
unique_ptr<StMgrHeartbeatProcessor> processor =
make_unique<StMgrHeartbeatProcessor>(_reqid, _conn, std::move(_request), tmaster_, this);
processor->Start();
}
void TMasterServer::HandleMetricsMgrStats(Connection*,
- unique_ptr<proto::tmaster::PublishMetrics> _request) {
+ pool_unique_ptr<proto::tmaster::PublishMetrics> _request) {
collector_->AddMetric(*_request);
}
void TMasterServer::HandleInstanceStateStored(Connection*,
- unique_ptr<proto::ckptmgr::InstanceStateStored> _message) {
+ pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message) {
tmaster_->HandleInstanceStateStored(_message->checkpoint_id(), _message->instance());
}
void TMasterServer::HandleRestoreTopologyStateResponse(Connection* _conn,
- unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message) {
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message) {
tmaster_->HandleRestoreTopologyStateResponse(_conn, _message->checkpoint_id(),
_message->restore_txid(),
_message->status().status());
}
void TMasterServer::HandleResetTopologyStateMessage(Connection* _conn,
- unique_ptr<proto::ckptmgr::ResetTopologyState> _message) {
+ pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message) {
tmaster_->ResetTopologyState(_conn, _message->dead_stmgr(),
_message->dead_taskid(), _message->reason());
}
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.h b/heron/tmaster/src/cpp/manager/tmasterserver.h
index eb20c07..0524b61 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.h
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.h
@@ -47,22 +47,22 @@
private:
// Various handlers for different requests
void HandleStMgrRegisterRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
+ pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
void HandleStMgrHeartbeatRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request);
- void HandleMetricsMgrStats(Connection*, unique_ptr<proto::tmaster::PublishMetrics> _request);
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request);
+ void HandleMetricsMgrStats(Connection*, pool_unique_ptr<proto::tmaster::PublishMetrics> _request);
// Message sent by stmgr to tell tmaster that a particular checkpoint message
// was saved. This way the tmaster can keep track of which all instances have saved their
// state for any given checkpoint id.
void HandleInstanceStateStored(Connection*,
- unique_ptr<proto::ckptmgr::InstanceStateStored> _message);
+ pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message);
// Handle response from stmgr for the RestoreTopologyStateRequest
void HandleRestoreTopologyStateResponse(Connection*,
- unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message);
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message);
// Stmgr can request tmaster to reset the state of the topology in case it finds any errors.
void HandleResetTopologyStateMessage(Connection*,
- unique_ptr<proto::ckptmgr::ResetTopologyState> _message);
+ pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message);
// our tmaster
shared_ptr<TMetricsCollector> collector_;
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
index 222f9e4..1c98d70 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
@@ -31,8 +31,8 @@
namespace tmaster {
StMgrHeartbeatProcessor::StMgrHeartbeatProcessor(REQID reqid, Connection* conn,
- unique_ptr<proto::tmaster::StMgrHeartbeatRequest> request,
- TMaster* tmaster, Server* server)
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> request,
+ TMaster* tmaster, Server* server)
: Processor(reqid, conn, std::move(request), tmaster, server) {}
StMgrHeartbeatProcessor::~StMgrHeartbeatProcessor() {
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
index 6775263..6240bda 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
+++ b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
@@ -31,7 +31,7 @@
class StMgrHeartbeatProcessor : public Processor {
public:
StMgrHeartbeatProcessor(REQID _reqid, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request,
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request,
TMaster* _tmaster,
Server* _server);
virtual ~StMgrHeartbeatProcessor();
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
index 3139703..c58dda8 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
@@ -32,8 +32,8 @@
namespace tmaster {
StMgrRegisterProcessor::StMgrRegisterProcessor(REQID _reqid, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
- TMaster* _tmaster, Server* _server)
+ pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
+ TMaster* _tmaster, Server* _server)
: Processor(_reqid, _conn, std::move(_request), _tmaster, _server) {}
StMgrRegisterProcessor::~StMgrRegisterProcessor() {
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.h b/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
index f197377..208c09f 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
+++ b/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
@@ -31,7 +31,7 @@
class StMgrRegisterProcessor : public Processor {
public:
StMgrRegisterProcessor(REQID _reqid, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
+ pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
TMaster* _tmaster,
Server* _server);
virtual ~StMgrRegisterProcessor();
diff --git a/heron/tmaster/src/cpp/processor/tmaster-processor.cpp b/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
index 1df63ae..0caaa8b 100644
--- a/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
@@ -29,7 +29,7 @@
namespace tmaster {
Processor::Processor(REQID _reqid, Connection* _conn,
- unique_ptr<google::protobuf::Message> _request,
+ pool_unique_ptr<google::protobuf::Message> _request,
TMaster* _tmaster, Server* _server)
: request_(std::move(_request)), tmaster_(_tmaster), server_(_server),
reqid_(_reqid), conn_(_conn) {}
diff --git a/heron/tmaster/src/cpp/processor/tmaster-processor.h b/heron/tmaster/src/cpp/processor/tmaster-processor.h
index 5ba2da9..70c7c20 100644
--- a/heron/tmaster/src/cpp/processor/tmaster-processor.h
+++ b/heron/tmaster/src/cpp/processor/tmaster-processor.h
@@ -31,7 +31,7 @@
class Processor {
public:
- Processor(REQID _reqid, Connection* _conn, unique_ptr<google::protobuf::Message> _request,
+ Processor(REQID _reqid, Connection* _conn, pool_unique_ptr<google::protobuf::Message> _request,
TMaster* _tmaster,
Server* _server);
virtual ~Processor();
@@ -41,7 +41,7 @@
void SendResponse(const google::protobuf::Message& _response);
Connection* GetConnection() { return conn_; }
void CloseConnection();
- unique_ptr<google::protobuf::Message> request_;
+ pool_unique_ptr<google::protobuf::Message> request_;
TMaster* tmaster_;
Server* server_;
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.cpp b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
index c19b33e..13cf22e 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.cpp
+++ b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
@@ -76,9 +76,9 @@
}
void DummyStMgr::HandleRegisterResponse(
- void*,
- unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
- NetworkErrorCode status) {
+ void*,
+ pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
+ NetworkErrorCode status) {
if (status != OK) {
LOG(ERROR) << "NonOK response message for Register Response";
Stop();
@@ -106,9 +106,9 @@
}
void DummyStMgr::HandleHeartbeatResponse(
- void*,
- unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
- NetworkErrorCode status) {
+ void*,
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+ NetworkErrorCode status) {
if (status != OK) {
LOG(ERROR) << "NonOK response message for Register Response";
Stop();
@@ -126,7 +126,7 @@
}
void DummyStMgr::HandleNewAssignmentMessage(
- unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message) {
+ pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message) {
LOG(INFO) << "Got a new assignment";
HandleNewPhysicalPlan(message->new_pplan());
}
@@ -167,12 +167,12 @@
}
void DummyStMgr::HandleRestoreTopologyStateRequest(
- unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _m) {
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> _m) {
got_restore_message_ = true;
}
void DummyStMgr::HandleStartProcessingMessage(
- unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _m) {
+ pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _m) {
got_start_message_ = true;
}
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.h b/heron/tmaster/tests/cpp/server/dummystmgr.h
index d02a0e9..06df950 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.h
+++ b/heron/tmaster/tests/cpp/server/dummystmgr.h
@@ -52,16 +52,18 @@
virtual void HandleClose(NetworkErrorCode status);
private:
- void HandleRegisterResponse(void*, unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
+ void HandleRegisterResponse(void*,
+ pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
NetworkErrorCode);
- void HandleHeartbeatResponse(void*, unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+ void HandleHeartbeatResponse(void*,
+ pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
NetworkErrorCode);
- void HandleNewAssignmentMessage(unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message);
+ void HandleNewAssignmentMessage(pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message);
void HandleNewPhysicalPlan(const proto::system::PhysicalPlan& pplan);
void HandleRestoreTopologyStateRequest(
- unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> message);
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateRequest> message);
void HandleStartProcessingMessage(
- unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> message);
+ pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> message);
void OnReConnectTimer();
void OnHeartbeatTimer();
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.cpp b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
index d166d8c..a554998 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.cpp
+++ b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
@@ -48,7 +48,7 @@
}
void DummyTMaster::HandleRegisterRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
+ pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
std::vector<std::shared_ptr<proto::system::Instance>> instances;
stmgrs_[_request->stmgr().id()] =
std::make_shared<tmaster::StMgrState>(_conn, _request->stmgr(), instances, *this);
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.h b/heron/tmaster/tests/cpp/server/dummytmaster.h
index 7047a19..c00ade5 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.h
+++ b/heron/tmaster/tests/cpp/server/dummytmaster.h
@@ -47,7 +47,7 @@
private:
void HandleRegisterRequest(REQID _id, Connection* _conn,
- unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
+ pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
void HandleHeartbeatRequest(REQID _id, Connection* _conn,
proto::tmaster::StMgrHeartbeatRequest* _request);
tmaster::StMgrMap stmgrs_;