STREAMCOMP-2724: TMaster migrated to mostly use smart pointers instead of manual memory management (#3248)
diff --git a/heron/common/src/cpp/basics/basics.h b/heron/common/src/cpp/basics/basics.h
index bc00b00..766a64f 100644
--- a/heron/common/src/cpp/basics/basics.h
+++ b/heron/common/src/cpp/basics/basics.h
@@ -46,4 +46,13 @@
#include "basics/sptest.h"
#include "basics/mempool.h"
+// The standard std::make_unique(...) was introduced starting from C++ 14. Heron uses C++ 11.
+// Unfortunatelly we can not bump a compiler version used in Heron since we are tight coupled with
+// CentOS 7 whih comes with very old version of GCC (4.8.5). That's why we introduced
+// make_unique(...) manually here.
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args) {
+ return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+
#endif // HERON_BASICS_H_
diff --git a/heron/common/src/cpp/metrics/metrics-mgr-st.cpp b/heron/common/src/cpp/metrics/metrics-mgr-st.cpp
index e79c21f..5da37f1 100644
--- a/heron/common/src/cpp/metrics/metrics-mgr-st.cpp
+++ b/heron/common/src/cpp/metrics/metrics-mgr-st.cpp
@@ -36,6 +36,8 @@
namespace heron {
namespace common {
+using std::shared_ptr;
+
MetricsMgrSt::MetricsMgrSt(sp_int32 _metricsmgr_port, sp_int32 _interval, EventLoop* eventLoop) {
options_.set_host("127.0.0.1");
options_.set_port(_metricsmgr_port);
@@ -52,9 +54,6 @@
MetricsMgrSt::~MetricsMgrSt() {
CHECK_EQ(client_->getEventLoop()->unRegisterTimer(timerid_), 0);
delete client_;
- for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
- delete iter->second;
- }
}
void MetricsMgrSt::Start(const sp_string& _my_hostname, sp_int32 _my_port,
@@ -74,7 +73,7 @@
client_->SendMetricsCacheLocation(location);
}
-void MetricsMgrSt::register_metric(const sp_string& _metric_name, IMetric* _metric) {
+void MetricsMgrSt::register_metric(const sp_string& _metric_name, shared_ptr<IMetric> _metric) {
metrics_[_metric_name] = _metric;
}
diff --git a/heron/common/src/cpp/metrics/metrics-mgr-st.h b/heron/common/src/cpp/metrics/metrics-mgr-st.h
index 4cec4b7..ef15710 100644
--- a/heron/common/src/cpp/metrics/metrics-mgr-st.h
+++ b/heron/common/src/cpp/metrics/metrics-mgr-st.h
@@ -44,6 +44,8 @@
namespace heron {
namespace common {
+using std::shared_ptr;
+
class MetricsMgrClient;
class IMetric;
@@ -52,7 +54,7 @@
MetricsMgrSt(sp_int32 _metricsmgr_port, sp_int32 _interval, EventLoop* eventLoop);
virtual ~MetricsMgrSt();
- void register_metric(const sp_string& _metric_name, IMetric* _metric);
+ void register_metric(const sp_string& _metric_name, shared_ptr<IMetric> _metric);
void unregister_metric(const sp_string& _metric_name);
void RefreshTMasterLocation(const proto::tmaster::TMasterLocation& location);
void RefreshMetricsCacheLocation(const proto::tmaster::MetricsCacheLocation& location);
@@ -72,7 +74,7 @@
void gather_metrics(EventLoop::Status);
VCallback<EventLoop::Status> timer_cb_;
- std::map<sp_string, IMetric*> metrics_;
+ std::map<sp_string, shared_ptr<IMetric>> metrics_;
MetricsMgrClient* client_;
NetworkOptions options_;
sp_int64 timerid_;
diff --git a/heron/common/src/cpp/metrics/metricsmgr-client.cpp b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
index 59e06b2..998c607 100644
--- a/heron/common/src/cpp/metrics/metricsmgr-client.cpp
+++ b/heron/common/src/cpp/metrics/metricsmgr-client.cpp
@@ -43,7 +43,7 @@
tmaster_location_(NULL),
metricscache_location_(NULL),
registered_(false) {
- InstallResponseHandler(new proto::system::MetricPublisherRegisterRequest(),
+ InstallResponseHandler(make_unique<proto::system::MetricPublisherRegisterRequest>(),
&MetricsMgrClient::HandleRegisterResponse);
Start();
}
@@ -69,8 +69,7 @@
void MetricsMgrClient::ReConnect() { Start(); }
void MetricsMgrClient::SendRegisterRequest() {
- proto::system::MetricPublisherRegisterRequest* request;
- request = new proto::system::MetricPublisherRegisterRequest();
+ auto request = make_unique<proto::system::MetricPublisherRegisterRequest>();
proto::system::MetricPublisher* publisher = request->mutable_publisher();
publisher->set_hostname(hostname_);
@@ -79,7 +78,7 @@
publisher->set_instance_id(instance_id_);
publisher->set_instance_index(instance_index_);
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
}
void MetricsMgrClient::HandleRegisterResponse(
diff --git a/heron/common/src/cpp/network/client.cpp b/heron/common/src/cpp/network/client.cpp
index 1c075f8..c9f88aa 100644
--- a/heron/common/src/cpp/network/client.cpp
+++ b/heron/common/src/cpp/network/client.cpp
@@ -37,12 +37,13 @@
void Client::Stop() { Stop_Base(); }
-void Client::SendRequest(google::protobuf::Message* _request, void* _ctx) {
- SendRequest(_request, _ctx, -1);
+void Client::SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx) {
+ SendRequest(std::move(_request), _ctx, -1);
}
-void Client::SendRequest(google::protobuf::Message* _request, void* _ctx, sp_int64 _msecs) {
- InternalSendRequest(_request, _ctx, _msecs);
+void Client::SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx,
+ sp_int64 _msecs) {
+ InternalSendRequest(std::move(_request), _ctx, _msecs);
}
void Client::SendResponse(REQID _id, const google::protobuf::Message& _response) {
@@ -93,12 +94,12 @@
void Client::Init() { message_rid_gen_ = new REQID_Generator(); }
-void Client::InternalSendRequest(google::protobuf::Message* _request, void* _ctx, sp_int64 _msecs) {
+void Client::InternalSendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx,
+ sp_int64 _msecs) {
auto iter = requestResponseMap_.find(_request->GetTypeName());
CHECK(iter != requestResponseMap_.end());
const sp_string& _expected_response_type = iter->second;
if (state_ != CONNECTED) {
- delete _request;
responseHandlers[_expected_response_type](NULL, WRITE_ERROR);
return;
}
@@ -118,9 +119,6 @@
CHECK_EQ(opkt->PackREQID(rid), 0);
CHECK_EQ(opkt->PackProtocolBuffer(*_request, byte_size), 0);
- // delete the request
- delete _request;
-
Connection* conn = static_cast<Connection*>(conn_);
if (conn->sendPacket(opkt) != 0) {
context_map_.erase(rid);
diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h
index 34f4583..593751b 100644
--- a/heron/common/src/cpp/network/client.h
+++ b/heron/common/src/cpp/network/client.h
@@ -104,10 +104,10 @@
// a user owned piece of context that is not interpreted by the
// client which is passed on to the HandleResponse
// A negative value of the msecs means no timeout.
- void SendRequest(google::protobuf::Message* _request, void* ctx, sp_int64 msecs);
+ void SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* ctx, sp_int64 msecs);
// Convinience method of the above function with no timeout
- void SendRequest(google::protobuf::Message* _request, void* ctx);
+ void SendRequest(std::unique_ptr<google::protobuf::Message> _request, void* ctx);
// This interface is used if you want to communicate with the other end
// on a non-request-response based communication.
@@ -129,15 +129,13 @@
// Register a handler for a particular response type
template <typename S, typename T, typename M>
- void InstallResponseHandler(S* _request,
+ void InstallResponseHandler(std::unique_ptr<S> _request,
void (T::*method)(void* _ctx, M*, NetworkErrorCode status)) {
- google::protobuf::Message* m = new M();
+ auto m = make_unique<M>();
T* t = static_cast<T*>(this);
responseHandlers[m->GetTypeName()] = std::bind(&Client::dispatchResponse<T, M>, this, t, method,
std::placeholders::_1, std::placeholders::_2);
requestResponseMap_[_request->GetTypeName()] = m->GetTypeName();
- delete m;
- delete _request;
}
// Register a handler for a particular message type
@@ -197,7 +195,8 @@
//! Handle most of the init stuff
void Init();
- void InternalSendRequest(google::protobuf::Message* _request, void* _ctx, sp_int64 _msecs);
+ void InternalSendRequest(std::unique_ptr<google::protobuf::Message> _request, void* _ctx,
+ sp_int64 _msecs);
void InternalSendMessage(const google::protobuf::Message& _message);
void InternalSendResponse(OutgoingPacket* _packet);
diff --git a/heron/common/src/cpp/network/httpserver.cpp b/heron/common/src/cpp/network/httpserver.cpp
index 7816b38..5f8793b 100644
--- a/heron/common/src/cpp/network/httpserver.cpp
+++ b/heron/common/src/cpp/network/httpserver.cpp
@@ -89,10 +89,9 @@
}
void HTTPServer::SendReply(IncomingHTTPRequest* _request, sp_int32 _code,
- OutgoingHTTPResponse* _response) {
+ unique_ptr<OutgoingHTTPResponse> _response) {
CHECK(_request->underlying_request() == _response->underlying_response());
evhttp_send_reply(_request->underlying_request(), _code, "", NULL);
- delete _response;
}
void HTTPServer::SendErrorReply(IncomingHTTPRequest* _request, sp_int32 _code) {
diff --git a/heron/common/src/cpp/network/httpserver.h b/heron/common/src/cpp/network/httpserver.h
index f310236..5ea4b54 100644
--- a/heron/common/src/cpp/network/httpserver.h
+++ b/heron/common/src/cpp/network/httpserver.h
@@ -37,6 +37,8 @@
class IncomingHTTPRequest;
class OutgoingHTTPResponse;
+using std::unique_ptr;
+
class HTTPServer {
public:
// Constructor
@@ -56,7 +58,7 @@
void InstallGenericCallBack(VCallback<IncomingHTTPRequest*> cb);
void SendReply(IncomingHTTPRequest* _request, sp_int32 status_code,
- OutgoingHTTPResponse* _response);
+ unique_ptr<OutgoingHTTPResponse> _response);
void SendErrorReply(IncomingHTTPRequest* _request, sp_int32 status_code);
void SendErrorReply(IncomingHTTPRequest* _request, sp_int32 status_code,
const sp_string& _reason);
diff --git a/heron/common/tests/cpp/network/http_server_unittest.cpp b/heron/common/tests/cpp/network/http_server_unittest.cpp
index df68a61..cd07712 100644
--- a/heron/common/tests/cpp/network/http_server_unittest.cpp
+++ b/heron/common/tests/cpp/network/http_server_unittest.cpp
@@ -65,9 +65,9 @@
EXPECT_EQ(value.str(), keyvalues[i].second);
}
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(_request);
+ auto response = make_unique<OutgoingHTTPResponse>(_request);
response->AddResponse("This is response for meta object\r\n");
- server_->SendReply(_request, 200, response);
+ server_->SendReply(_request, 200, std::move(response));
}
void TestHttpServer::HandleGenericRequest(IncomingHTTPRequest* _request) {
diff --git a/heron/instance/src/cpp/gateway/stmgr-client.cpp b/heron/instance/src/cpp/gateway/stmgr-client.cpp
index 9397ea3..9650ebc 100644
--- a/heron/instance/src/cpp/gateway/stmgr-client.cpp
+++ b/heron/instance/src/cpp/gateway/stmgr-client.cpp
@@ -51,7 +51,7 @@
max_reconnect_times_ = config::HeronInternalsConfigReader::Instance()
->GetHeronInstanceReconnectStreammgrTimes();
reconnect_attempts_ = 0;
- InstallResponseHandler(new proto::stmgr::RegisterInstanceRequest(),
+ InstallResponseHandler(make_unique<proto::stmgr::RegisterInstanceRequest>(),
&StMgrClient::HandleRegisterResponse);
InstallMessageHandler(&StMgrClient::HandlePhysicalPlan);
InstallMessageHandler(&StMgrClient::HandleTupleMessage);
@@ -128,11 +128,11 @@
void StMgrClient::OnReconnectTimer() { Start(); }
void StMgrClient::SendRegisterRequest() {
- auto request = new proto::stmgr::RegisterInstanceRequest();
+ auto request = make_unique<proto::stmgr::RegisterInstanceRequest>();
request->set_topology_name(topologyName_);
request->set_topology_id(topologyId_);
request->mutable_instance()->CopyFrom(instanceProto_);
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
return;
}
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
index 61853c1..906ff59 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.cpp
@@ -200,12 +200,12 @@
}
void HeronLocalFileStateMgr::GetTopology(const std::string& _topology_name,
- proto::api::Topology* _return,
+ proto::api::Topology& _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status = ReadAllFileContents(GetTopologyPath(_topology_name), contents);
if (status == proto::system::OK) {
- if (!_return->ParseFromString(contents)) {
+ if (!_return.ParseFromString(contents)) {
status = proto::system::STATE_CORRUPTED;
}
}
@@ -248,7 +248,7 @@
}
void HeronLocalFileStateMgr::GetPhysicalPlan(const std::string& _topology_name,
- proto::system::PhysicalPlan* _return,
+ shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
@@ -273,7 +273,7 @@
}
void HeronLocalFileStateMgr::GetPackingPlan(const std::string& _topology_name,
- proto::system::PackingPlan* _return,
+ shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
@@ -337,7 +337,7 @@
}
void HeronLocalFileStateMgr::CreateStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> cb) {
std::string fname = GetStatefulCheckpointsPath(_topology_name);
// First check to see if location exists.
@@ -348,7 +348,7 @@
}
std::string contents;
- _ckpt.SerializeToString(&contents);
+ _ckpt->SerializeToString(&contents);
proto::system::StatusCode status = WriteToFile(fname, contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
CHECK_GT(eventLoop_->registerTimer(std::move(wCb), false, 0), 0);
@@ -362,10 +362,10 @@
}
void HeronLocalFileStateMgr::SetStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
- _ckpt.SerializeToString(&contents);
+ _ckpt->SerializeToString(&contents);
proto::system::StatusCode status =
WriteToFile(GetStatefulCheckpointsPath(_topology_name), contents);
auto wCb = [cb, status](EventLoop::Status) { cb(status); };
@@ -373,7 +373,7 @@
}
void HeronLocalFileStateMgr::GetStatefulCheckpoints(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
VCallback<proto::system::StatusCode> cb) {
std::string contents;
proto::system::StatusCode status =
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
index 0b8a090..f6bd655 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-localfilestatemgr.h
@@ -67,7 +67,7 @@
void CreateTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
void DeleteTopology(const std::string& _topology_name, VCallback<proto::system::StatusCode> _cb);
void SetTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
- void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+ void GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
VCallback<proto::system::StatusCode> _cb);
void CreatePhysicalPlan(const proto::system::PhysicalPlan& _pplan,
@@ -76,13 +76,15 @@
VCallback<proto::system::StatusCode> _cb);
void SetPhysicalPlan(const proto::system::PhysicalPlan& _pplan,
VCallback<proto::system::StatusCode> _cb);
- void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
+ void GetPhysicalPlan(const std::string& _topology_name,
+ shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> _cb);
void CreatePackingPlan(const std::string& _topology_name,
const proto::system::PackingPlan& _packingPlan,
VCallback<proto::system::StatusCode> _cb);
- void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+ void GetPackingPlan(const std::string& _topology_name,
+ shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> _cb);
void CreateExecutionState(const proto::system::ExecutionState& _pplan,
@@ -95,15 +97,15 @@
VCallback<proto::system::StatusCode> _cb);
void CreateStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
- VCallback<proto::system::StatusCode> _cb);
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+ VCallback<proto::system::StatusCode> _cb);
void DeleteStatefulCheckpoints(const std::string& _topology_name,
VCallback<proto::system::StatusCode> _cb);
void GetStatefulCheckpoints(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
- VCallback<proto::system::StatusCode> _cb);
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+ VCallback<proto::system::StatusCode> _cb);
void SetStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _state,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _state,
VCallback<proto::system::StatusCode> _cb);
void ListTopologies(std::vector<sp_string>* _return, VCallback<proto::system::StatusCode> _cb);
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
index 016e87e..5201f6c 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.cpp
@@ -32,13 +32,14 @@
namespace heron {
namespace common {
-HeronStateMgr* HeronStateMgr::MakeStateMgr(const std::string& _zk_hostport,
+shared_ptr<HeronStateMgr> HeronStateMgr::MakeStateMgr(const std::string& _zk_hostport,
const std::string& _topleveldir, EventLoop* eventLoop,
bool exitOnSessionExpiry) {
if (_zk_hostport.empty()) {
- return new HeronLocalFileStateMgr(_topleveldir, eventLoop);
+ return std::make_shared<HeronLocalFileStateMgr>(_topleveldir, eventLoop);
} else {
- return new HeronZKStateMgr(_zk_hostport, _topleveldir, eventLoop, exitOnSessionExpiry);
+ return std::make_shared<HeronZKStateMgr>(_zk_hostport, _topleveldir, eventLoop,
+ exitOnSessionExpiry);
}
}
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
index ee05f32..6cad372 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-statemgr.h
@@ -55,15 +55,17 @@
namespace heron {
namespace common {
+using std::shared_ptr;
+
class HeronStateMgr {
public:
explicit HeronStateMgr(const std::string& _topleveldir);
virtual ~HeronStateMgr();
// Factory method to create
- static HeronStateMgr* MakeStateMgr(const std::string& _zk_hostport,
- const std::string& _topleveldir, EventLoop* eventLoop,
- bool exitOnSessionExpiry = true);
+ static shared_ptr<HeronStateMgr> MakeStateMgr(const std::string& _zk_hostport,
+ const std::string& _topleveldir, EventLoop* eventLoop,
+ bool exitOnSessionExpiry = true);
//
// Interface methods
@@ -100,7 +102,7 @@
VCallback<proto::system::StatusCode> _cb) = 0;
virtual void SetTopology(const proto::api::Topology& _top,
VCallback<proto::system::StatusCode> _cb) = 0;
- virtual void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+ virtual void GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
VCallback<proto::system::StatusCode> _cb) = 0;
// Gets/Sets PhysicalPlan
@@ -111,12 +113,12 @@
virtual void SetPhysicalPlan(const proto::system::PhysicalPlan& _plan,
VCallback<proto::system::StatusCode> _cb) = 0;
virtual void GetPhysicalPlan(const std::string& _topology_name,
- proto::system::PhysicalPlan* _return,
+ shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> _cb) = 0;
// Gets PackingPlan
virtual void GetPackingPlan(const std::string& _topology_name,
- proto::system::PackingPlan* _return,
+ shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> _cb) = 0;
// Gets/Sets ExecutionState
@@ -135,15 +137,15 @@
// Gets/Sets Stateful Checkpoint
virtual void CreateStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> _cb) = 0;
virtual void DeleteStatefulCheckpoints(const std::string& _topology_name,
VCallback<proto::system::StatusCode> _cb) = 0;
virtual void SetStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> _cb) = 0;
virtual void GetStatefulCheckpoints(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
VCallback<proto::system::StatusCode> _cb) = 0;
// Calls to list the topologies and physical plans
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
index 7c6c967..f4f9497 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.cpp
@@ -180,12 +180,12 @@
zkclient_->Set(path, value, std::move(wCb));
}
-void HeronZKStateMgr::GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+void HeronZKStateMgr::GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
VCallback<proto::system::StatusCode> cb) {
std::string path = GetTopologyPath(_topology_name);
std::string* contents = new std::string();
- auto wCb = [contents, _return, cb, this](sp_int32 rc) {
+ auto wCb = [contents, &_return, cb, this](sp_int32 rc) {
this->GetTopologyDone(contents, _return, std::move(cb), rc);
};
@@ -221,7 +221,7 @@
}
void HeronZKStateMgr::GetPhysicalPlan(const std::string& _topology_name,
- proto::system::PhysicalPlan* _return,
+ shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> cb) {
std::string path = GetPhysicalPlanPath(_topology_name);
std::string* contents = new std::string();
@@ -233,7 +233,7 @@
}
void HeronZKStateMgr::GetPackingPlan(const std::string& _topology_name,
- proto::system::PackingPlan* _return,
+ shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> cb) {
std::string path = GetPackingPlanPath(_topology_name);
std::string* contents = new std::string();
@@ -285,11 +285,11 @@
}
void HeronZKStateMgr::CreateStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> cb) {
std::string path = GetStatefulCheckpointsPath(_topology_name);
std::string contents;
- _ckpt.SerializeToString(&contents);
+ _ckpt->SerializeToString(&contents);
auto wCb = [cb, this](sp_int32 rc) { this->CreateStatefulCheckpointsDone(std::move(cb), rc); };
zkclient_->CreateNode(path, contents, false, std::move(wCb));
@@ -304,19 +304,19 @@
}
void HeronZKStateMgr::SetStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> cb) {
std::string path = GetStatefulCheckpointsPath(_topology_name);
std::string contents;
- _ckpt.SerializeToString(&contents);
+ _ckpt->SerializeToString(&contents);
auto wCb = [cb, this](sp_int32 rc) { this->SetStatefulCheckpointsDone(std::move(cb), rc); };
zkclient_->Set(path, contents, std::move(wCb));
}
void HeronZKStateMgr::GetStatefulCheckpoints(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
- VCallback<proto::system::StatusCode> cb) {
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+ VCallback<proto::system::StatusCode> cb) {
std::string path = GetStatefulCheckpointsPath(_topology_name);
std::string* contents = new std::string();
auto wCb = [contents, _return, cb, this](sp_int32 rc) {
@@ -483,11 +483,11 @@
cb(code);
}
-void HeronZKStateMgr::GetTopologyDone(std::string* _contents, proto::api::Topology* _return,
+void HeronZKStateMgr::GetTopologyDone(std::string* _contents, proto::api::Topology& _return,
VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
proto::system::StatusCode code = proto::system::OK;
if (_rc == ZOK) {
- if (!_return->ParseFromString(*_contents)) {
+ if (!_return.ParseFromString(*_contents)) {
LOG(ERROR) << "topology parsing failed; zk corruption?" << std::endl;
code = proto::system::STATE_CORRUPTED;
}
@@ -541,7 +541,7 @@
}
void HeronZKStateMgr::GetPhysicalPlanDone(std::string* _contents,
- proto::system::PhysicalPlan* _return,
+ shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
proto::system::StatusCode code = proto::system::OK;
if (_rc == ZOK) {
@@ -559,7 +559,7 @@
}
void HeronZKStateMgr::GetPackingPlanDone(std::string* _contents,
- proto::system::PackingPlan* _return,
+ shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
proto::system::StatusCode code = proto::system::OK;
if (_rc == ZOK) {
@@ -678,8 +678,8 @@
}
void HeronZKStateMgr::GetStatefulCheckpointsDone(std::string* _contents,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
- VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+ VCallback<proto::system::StatusCode> cb, sp_int32 _rc) {
proto::system::StatusCode code = proto::system::OK;
if (_rc == ZOK) {
if (!_return->ParseFromString(*_contents)) {
diff --git a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
index 4c9b9b7..202e746 100644
--- a/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
+++ b/heron/statemgrs/src/cpp/statemgr/heron-zkstatemgr.h
@@ -87,7 +87,7 @@
void CreateTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
void DeleteTopology(const std::string& _topology_name, VCallback<proto::system::StatusCode> _cb);
void SetTopology(const proto::api::Topology& _top, VCallback<proto::system::StatusCode> _cb);
- void GetTopology(const std::string& _topology_name, proto::api::Topology* _return,
+ void GetTopology(const std::string& _topology_name, proto::api::Topology& _return,
VCallback<proto::system::StatusCode> _cb);
// Gets/Sets physical plan
@@ -97,10 +97,12 @@
VCallback<proto::system::StatusCode> _cb);
void SetPhysicalPlan(const proto::system::PhysicalPlan& _pplan,
VCallback<proto::system::StatusCode> _cb);
- void GetPhysicalPlan(const std::string& _topology_name, proto::system::PhysicalPlan* _return,
+ void GetPhysicalPlan(const std::string& _topology_name,
+ shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> _cb);
- void GetPackingPlan(const std::string& _topology_name, proto::system::PackingPlan* _return,
+ void GetPackingPlan(const std::string& _topology_name,
+ shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> _cb);
// Gets/Sets execution state
@@ -115,16 +117,16 @@
// Gets/Sets the Stateful Checkpoints
void CreateStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
- VCallback<proto::system::StatusCode> _cb);
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+ VCallback<proto::system::StatusCode> _cb);
void DeleteStatefulCheckpoints(const std::string& _topology_name,
VCallback<proto::system::StatusCode> _cb);
void SetStatefulCheckpoints(const std::string& _topology_name,
- const proto::ckptmgr::StatefulConsistentCheckpoints& _ckpt,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
VCallback<proto::system::StatusCode> _cb);
void GetStatefulCheckpoints(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
- VCallback<proto::system::StatusCode> _cb);
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
+ VCallback<proto::system::StatusCode> _cb);
void ListTopologies(std::vector<sp_string>* _return, VCallback<proto::system::StatusCode> _cb);
void ListExecutionStateTopologies(std::vector<sp_string>* _return,
@@ -153,15 +155,15 @@
void CreateTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeleteTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetTopologyDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
- void GetTopologyDone(std::string* _contents, proto::api::Topology* _return,
+ void GetTopologyDone(std::string* _contents, proto::api::Topology& _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void CreatePhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void DeletePhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetPhysicalPlanDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
- void GetPhysicalPlanDone(std::string* _contents, proto::system::PhysicalPlan* _return,
+ void GetPhysicalPlanDone(std::string* _contents, shared_ptr<proto::system::PhysicalPlan> _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
- void GetPackingPlanDone(std::string* _contents, proto::system::PackingPlan* _return,
+ void GetPackingPlanDone(std::string* _contents, shared_ptr<proto::system::PackingPlan> _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void CreateExecutionStateDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
@@ -174,7 +176,7 @@
void DeleteStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void SetStatefulCheckpointsDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void GetStatefulCheckpointsDone(std::string* _contents,
- proto::ckptmgr::StatefulConsistentCheckpoints* _return,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _return,
VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
void ListTopologiesDone(VCallback<proto::system::StatusCode> _cb, sp_int32 _rc);
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
index 251725c..7dcfe6a 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.cpp
@@ -45,7 +45,7 @@
neighbour_calculator_(_neighbour_calculator),
metrics_manager_client_(_metrics_manager_client), tupleset_drainer_(_tupleset_drainer),
tuplestream_drainer_(_tuplestream_drainer), ckpt_drainer_(_ckpt_drainer) {
- size_metric_ = new common::AssignableMetric(current_size_);
+ size_metric_ = std::make_shared<common::AssignableMetric>(current_size_);
metrics_manager_client_->register_metric("__stateful_gateway_size", size_metric_);
}
@@ -53,8 +53,8 @@
for (auto kv : pending_tuples_) {
delete kv.second;
}
+
metrics_manager_client_->unregister_metric("__stateful_gateway_size");
- delete size_metric_;
}
void CheckpointGateway::SendToInstance(sp_int32 _task_id,
diff --git a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
index 1040523..0233e4e 100644
--- a/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
+++ b/heron/stmgr/src/cpp/manager/checkpoint-gateway.h
@@ -102,7 +102,7 @@
sp_uint64 current_size_;
NeighbourCalculator* neighbour_calculator_;
common::MetricsMgrSt* metrics_manager_client_;
- common::AssignableMetric* size_metric_;
+ std::shared_ptr<common::AssignableMetric> size_metric_;
std::unordered_map<sp_int32, CheckpointInfo*> pending_tuples_;
std::function<void(sp_int32, proto::system::HeronTupleSet2*)> tupleset_drainer_;
std::function<void(proto::stmgr::TupleStreamMessage*)> tuplestream_drainer_;
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
index a41bf9b..a36527f 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.cpp
@@ -27,6 +27,9 @@
namespace heron {
namespace stmgr {
+using std::unique_ptr;
+using proto::ckptmgr::SaveInstanceStateRequest;
+
CkptMgrClient::CkptMgrClient(EventLoop* eventloop, const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
const sp_string& _ckptmgr_id, const sp_string& _stmgr_id,
@@ -50,11 +53,11 @@
// TODO(nlu): take the value from config
reconnect_cpktmgr_interval_sec_ = 10;
- InstallResponseHandler(new proto::ckptmgr::RegisterStMgrRequest(),
+ InstallResponseHandler(make_unique<proto::ckptmgr::RegisterStMgrRequest>(),
&CkptMgrClient::HandleRegisterStMgrResponse);
- InstallResponseHandler(new proto::ckptmgr::SaveInstanceStateRequest(),
+ InstallResponseHandler(make_unique<proto::ckptmgr::SaveInstanceStateRequest>(),
&CkptMgrClient::HandleSaveInstanceStateResponse);
- InstallResponseHandler(new proto::ckptmgr::GetInstanceStateRequest(),
+ InstallResponseHandler(make_unique<proto::ckptmgr::GetInstanceStateRequest>(),
&CkptMgrClient::HandleGetInstanceStateResponse);
}
@@ -141,17 +144,17 @@
void CkptMgrClient::OnReconnectTimer() { Start(); }
void CkptMgrClient::SendRegisterRequest() {
- auto request = new proto::ckptmgr::RegisterStMgrRequest();
+ auto request = make_unique<proto::ckptmgr::RegisterStMgrRequest>();
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
request->set_stmgr_id(stmgr_id_);
request->mutable_physical_plan()->CopyFrom(*pplan_);
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
}
-void CkptMgrClient::SaveInstanceState(proto::ckptmgr::SaveInstanceStateRequest* _request) {
+void CkptMgrClient::SaveInstanceState(unique_ptr<SaveInstanceStateRequest> _request) {
LOG(INFO) << "Sending SaveInstanceState to ckptmgr" << std::endl;
- SendRequest(_request, NULL);
+ SendRequest(std::move(_request), NULL);
}
void CkptMgrClient::SetPhysicalPlan(proto::system::PhysicalPlan& _pplan) {
@@ -170,10 +173,10 @@
void CkptMgrClient::GetInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id,
int32_t* _nattempts) {
- auto request = new proto::ckptmgr::GetInstanceStateRequest();
+ auto request = make_unique<proto::ckptmgr::GetInstanceStateRequest>();
request->mutable_instance()->CopyFrom(_instance);
request->set_checkpoint_id(_checkpoint_id);
- SendRequest(request, _nattempts);
+ SendRequest(std::move(request), _nattempts);
}
void CkptMgrClient::HandleSaveInstanceStateResponse(void*,
diff --git a/heron/stmgr/src/cpp/manager/ckptmgr-client.h b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
index 4d34004..6a58888 100644
--- a/heron/stmgr/src/cpp/manager/ckptmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/ckptmgr-client.h
@@ -29,6 +29,8 @@
namespace heron {
namespace stmgr {
+using std::unique_ptr;
+
class CkptMgrClient : public Client {
public:
CkptMgrClient(EventLoop* eventLoop, const NetworkOptions& _options,
@@ -44,7 +46,7 @@
void Quit();
- virtual void SaveInstanceState(proto::ckptmgr::SaveInstanceStateRequest* _request);
+ virtual void SaveInstanceState(unique_ptr<proto::ckptmgr::SaveInstanceStateRequest> _request);
virtual void GetInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id);
virtual void SetPhysicalPlan(proto::system::PhysicalPlan& _pplan);
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 5f8d9a2..205f8d1 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -40,6 +40,8 @@
namespace heron {
namespace stmgr {
+using std::make_shared;
+
// Num data tuples sent to instances associated with this stream manager
const sp_string METRIC_DATA_TUPLES_TO_INSTANCES = "__tuples_to_workers";
// Num ack tuples sent to instances associated with this stream manager
@@ -107,8 +109,8 @@
InstallMessageHandler(&InstanceServer::HandleStoreInstanceStateCheckpointMessage);
InstallMessageHandler(&InstanceServer::HandleRestoreInstanceStateResponse);
- instance_server_metrics_ = new heron::common::MultiCountMetric();
- back_pressure_metric_aggr_ = new heron::common::TimeSpentMetric();
+ instance_server_metrics_ = make_shared<heron::common::MultiCountMetric>();
+ back_pressure_metric_aggr_ = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric("__server", instance_server_metrics_);
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR,
back_pressure_metric_aggr_);
@@ -133,8 +135,10 @@
Stop();
// Unregister and delete the metrics.
for (auto immIter = instance_metric_map_.begin();
- immIter != instance_metric_map_.end(); ++immIter) {
- sp_string instance_id = immIter->first;
+ immIter != instance_metric_map_.end();
+ immIter = instance_metric_map_.erase(immIter)) {
+ const sp_string& instance_id = immIter->first;
+
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
if (iter->second->instance_->instance_id() != instance_id) continue;
InstanceData* data = iter->second;
@@ -142,14 +146,14 @@
if (!iConn) break;
sp_string metric_name = MakeBackPressureCompIdMetricName(instance_id);
metrics_manager_client_->unregister_metric(metric_name);
- delete immIter->second;
}
}
// Clean the connection_buffer_metric_map
for (auto qmmIter = connection_buffer_metric_map_.begin();
- qmmIter != connection_buffer_metric_map_.end(); ++qmmIter) {
+ qmmIter != connection_buffer_metric_map_.end();) {
const sp_string& instance_id = qmmIter->first;
+
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
if (iter->second->instance_->instance_id() != instance_id) continue;
InstanceData* data = iter->second;
@@ -157,25 +161,23 @@
if (!iConn) break;
sp_string metric_name = MakeQueueSizeCompIdMetricName(instance_id);
metrics_manager_client_->unregister_metric(metric_name);
- delete qmmIter->second;
}
+
+ qmmIter = connection_buffer_metric_map_.erase(qmmIter);
}
// Clean the connection_buffer_length_metric_map
for (auto qlmIter = connection_buffer_length_metric_map_.begin();
- qlmIter != connection_buffer_length_metric_map_.end(); ++qlmIter) {
+ qlmIter != connection_buffer_length_metric_map_.end();) {
const sp_string& instance_id = qlmIter->first;
sp_string metric_name = MakeQueueLengthCompIdMetricName(instance_id);
metrics_manager_client_->unregister_metric(metric_name);
- delete qlmIter->second;
+ qlmIter = connection_buffer_length_metric_map_.erase(qlmIter);
}
metrics_manager_client_->unregister_metric("__server");
metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR);
- delete instance_server_metrics_;
- delete back_pressure_metric_aggr_;
-
// cleanup the instance info
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
delete iter->second;
@@ -235,7 +237,7 @@
remote_ends_who_caused_back_pressure_.end()) {
_conn->unsetCausedBackPressure();
remote_ends_who_caused_back_pressure_.erase(GetInstanceName(_conn));
- heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[GetInstanceName(_conn)];
+ auto instance_metric = instance_metric_map_[GetInstanceName(_conn)];
instance_metric->Stop();
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStopBackPressureToOtherStMgrs();
@@ -262,7 +264,6 @@
auto immiter = instance_metric_map_.find(instance_id);
if (immiter != instance_metric_map_.end()) {
metrics_manager_client_->unregister_metric(MakeBackPressureCompIdMetricName(instance_id));
- delete instance_metric_map_[instance_id];
instance_metric_map_.erase(instance_id);
}
@@ -270,7 +271,6 @@
auto qmmiter = connection_buffer_metric_map_.find(instance_id);
if (qmmiter != connection_buffer_metric_map_.end()) {
metrics_manager_client_->unregister_metric(MakeQueueSizeCompIdMetricName(instance_id));
- delete connection_buffer_metric_map_[instance_id];
connection_buffer_metric_map_.erase(instance_id);
}
@@ -278,7 +278,6 @@
auto qlmiter = connection_buffer_length_metric_map_.find(instance_id);
if (qlmiter != connection_buffer_length_metric_map_.end()) {
metrics_manager_client_->unregister_metric(MakeQueueLengthCompIdMetricName(instance_id));
- delete connection_buffer_length_metric_map_[instance_id];
connection_buffer_length_metric_map_.erase(instance_id);
}
@@ -333,13 +332,13 @@
}
// Create a metric for this instance
if (instance_metric_map_.find(instance_id) == instance_metric_map_.end()) {
- auto instance_metric = new heron::common::TimeSpentMetric();
+ auto instance_metric = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric(MakeBackPressureCompIdMetricName(instance_id),
instance_metric);
instance_metric_map_[instance_id] = instance_metric;
}
if (connection_buffer_metric_map_.find(instance_id) == connection_buffer_metric_map_.end()) {
- auto queue_metric = new heron::common::MultiMeanMetric();
+ auto queue_metric = make_shared<heron::common::MultiMeanMetric>();
metrics_manager_client_->register_metric(MakeQueueSizeCompIdMetricName(instance_id),
queue_metric);
connection_buffer_metric_map_[instance_id] = queue_metric;
@@ -348,7 +347,7 @@
if (connection_buffer_length_metric_map_.find(instance_id) ==
connection_buffer_length_metric_map_.end()) {
task_id_to_name[task_id] = instance_id;
- auto queue_metric = new heron::common::MultiCountMetric();
+ auto queue_metric = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric(MakeQueueLengthCompIdMetricName(instance_id),
queue_metric);
connection_buffer_length_metric_map_[instance_id] = queue_metric;
@@ -563,7 +562,7 @@
}
// Indicate which instance component had back pressure
- heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[instance_name];
+ auto instance_metric = instance_metric_map_[instance_name];
instance_metric->Start();
remote_ends_who_caused_back_pressure_.insert(instance_name);
@@ -592,7 +591,7 @@
remote_ends_who_caused_back_pressure_.erase(instance_name);
// Indicate which instance component stopped back pressure
- heron::common::TimeSpentMetric* instance_metric = instance_metric_map_[instance_name];
+ auto instance_metric = instance_metric_map_[instance_name];
instance_metric->Stop();
if (!stmgr_->DidAnnounceBackPressure()) {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index 5ad504a..f4fb1e9 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -42,6 +42,9 @@
namespace heron {
namespace stmgr {
+using std::shared_ptr;
+using std::unordered_map;
+
class StMgr;
class NeighbourCalculator;
class CheckpointGateway;
@@ -157,15 +160,16 @@
// map of Instance_id to metric
// Used for back pressure metrics
- typedef std::unordered_map<sp_string, heron::common::TimeSpentMetric*> InstanceMetricMap;
+ typedef unordered_map<sp_string, shared_ptr<heron::common::TimeSpentMetric>> InstanceMetricMap;
InstanceMetricMap instance_metric_map_;
// map of Instance_id to queue metric
- typedef std::unordered_map<sp_string, heron::common::MultiMeanMetric*> ConnectionBufferMetricMap;
+ typedef unordered_map<sp_string,
+ shared_ptr<heron::common::MultiMeanMetric>> ConnectionBufferMetricMap;
ConnectionBufferMetricMap connection_buffer_metric_map_;
// map of Instance_id to queue length metric
- typedef std::unordered_map<sp_string, heron::common::MultiCountMetric*>
+ typedef std::unordered_map<sp_string, shared_ptr<heron::common::MultiCountMetric>>
ConnectionBufferLengthMetricMap;
ConnectionBufferLengthMetricMap connection_buffer_length_metric_map_;
@@ -183,8 +187,8 @@
// Metrics
heron::common::MetricsMgrSt* metrics_manager_client_;
- heron::common::MultiCountMetric* instance_server_metrics_;
- heron::common::TimeSpentMetric* back_pressure_metric_aggr_;
+ shared_ptr<heron::common::MultiCountMetric> instance_server_metrics_;
+ shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_aggr_;
bool spouts_under_back_pressure_;
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
index 538c71d..726c341 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.cpp
@@ -39,6 +39,8 @@
namespace heron {
namespace stmgr {
+using std::make_shared;
+
// Stats for restore
const sp_string METRIC_START_RESTORE = "__start_restore";
const sp_string METRIC_START_RESTORE_IN_PROGRESS = "__start_restore_in_progress";
@@ -65,8 +67,8 @@
in_progress_ = false;
restore_done_watcher_ = _restore_done_watcher;
metrics_manager_client_ = _metrics_manager_client;
- multi_count_metric_ = new common::MultiCountMetric();
- time_spent_metric_ = new common::TimeSpentMetric();
+ multi_count_metric_ = make_shared<common::MultiCountMetric>();
+ time_spent_metric_ = make_shared<common::TimeSpentMetric>();
metrics_manager_client_->register_metric("__stateful_restore_count", multi_count_metric_);
metrics_manager_client_->register_metric("__stateful_restore_time", time_spent_metric_);
}
@@ -74,8 +76,6 @@
StatefulRestorer::~StatefulRestorer() {
metrics_manager_client_->unregister_metric("__stateful_restore_count");
metrics_manager_client_->unregister_metric("__stateful_restore_time");
- delete multi_count_metric_;
- delete time_spent_metric_;
}
void StatefulRestorer::StartRestore(const std::string& _checkpoint_id, sp_int64 _restore_txid,
diff --git a/heron/stmgr/src/cpp/manager/stateful-restorer.h b/heron/stmgr/src/cpp/manager/stateful-restorer.h
index a3e82d0..cf35fb3 100644
--- a/heron/stmgr/src/cpp/manager/stateful-restorer.h
+++ b/heron/stmgr/src/cpp/manager/stateful-restorer.h
@@ -47,6 +47,8 @@
namespace heron {
namespace stmgr {
+using std::shared_ptr;
+
class InstanceServer;
class TupleCache;
class StMgrClientMgr;
@@ -156,8 +158,8 @@
std::function<void(proto::system::StatusCode, std::string, sp_int64)> restore_done_watcher_;
// Different metrics
- common::MultiCountMetric* multi_count_metric_;
- common::TimeSpentMetric* time_spent_metric_;
+ shared_ptr<common::MultiCountMetric> multi_count_metric_;
+ shared_ptr<common::TimeSpentMetric> time_spent_metric_;
};
} // namespace stmgr
} // namespace heron
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.cpp b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
index 68b002f..572a794 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.cpp
@@ -34,6 +34,8 @@
namespace heron {
namespace stmgr {
+using std::make_shared;
+
// Num data tuples sent to other stream managers
const sp_string METRIC_DATA_TUPLES_TO_STMGRS = "__tuples_to_stmgrs";
// Num ack tuples sent to other stream managers
@@ -74,17 +76,17 @@
reconnect_other_streammgrs_interval_sec_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrClientReconnectIntervalSec();
- InstallResponseHandler(new proto::stmgr::StrMgrHelloRequest(), &StMgrClient::HandleHelloResponse);
+ InstallResponseHandler(make_unique<proto::stmgr::StrMgrHelloRequest>(),
+ &StMgrClient::HandleHelloResponse);
InstallMessageHandler(&StMgrClient::HandleTupleStreamMessage);
- stmgr_client_metrics_ = new heron::common::MultiCountMetric();
+ stmgr_client_metrics_ = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric("__client_" + other_stmgr_id_, stmgr_client_metrics_);
}
StMgrClient::~StMgrClient() {
Stop();
metrics_manager_client_->unregister_metric("__client_" + other_stmgr_id_);
- delete stmgr_client_metrics_;
}
void StMgrClient::Quit() {
@@ -178,11 +180,11 @@
}
void StMgrClient::SendHelloRequest() {
- auto request = new proto::stmgr::StrMgrHelloRequest();
+ auto request = make_unique<proto::stmgr::StrMgrHelloRequest>();
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
request->set_stmgr(our_stmgr_id_);
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
stmgr_client_metrics_->scope(METRIC_HELLO_MESSAGES_TO_STMGRS)->incr_by(1);
return;
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-client.h b/heron/stmgr/src/cpp/manager/stmgr-client.h
index 92494dc..2aed880 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-client.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-client.h
@@ -34,6 +34,9 @@
namespace heron {
namespace stmgr {
+
+using std::shared_ptr;
+
class StMgrClientMgr;
class StMgrClient : public Client {
@@ -78,7 +81,7 @@
StMgrClientMgr* client_manager_;
// Metrics
heron::common::MetricsMgrSt* metrics_manager_client_;
- heron::common::MultiCountMetric* stmgr_client_metrics_;
+ shared_ptr<heron::common::MultiCountMetric> stmgr_client_metrics_;
// Configs to be read
sp_int32 reconnect_other_streammgrs_interval_sec_;
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
index 0a3ecf1..d76e9b4 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
@@ -34,6 +34,8 @@
namespace heron {
namespace stmgr {
+using std::make_shared;
+
// New connections made with other stream managers.
const sp_string METRIC_STMGR_NEW_CONNECTIONS = "__stmgr_new_connections";
@@ -52,14 +54,13 @@
high_watermark_(_high_watermark),
low_watermark_(_low_watermark),
droptuples_upon_backpressure_(_droptuples_upon_backpressure) {
- stmgr_clientmgr_metrics_ = new heron::common::MultiCountMetric();
+ stmgr_clientmgr_metrics_ = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric("__clientmgr", stmgr_clientmgr_metrics_);
}
StMgrClientMgr::~StMgrClientMgr() {
// This should not be called
metrics_manager_client_->unregister_metric("__clientmgr");
- delete stmgr_clientmgr_metrics_;
}
void StMgrClientMgr::StartConnections(const proto::system::PhysicalPlan* _pplan) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
index 13dd7d3..f04515e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.h
@@ -34,6 +34,9 @@
namespace heron {
namespace stmgr {
+
+using std::shared_ptr;
+
class StMgr;
class StMgrClient;
@@ -89,7 +92,7 @@
StMgr* stream_manager_;
// Metrics
heron::common::MetricsMgrSt* metrics_manager_client_;
- heron::common::MultiCountMetric* stmgr_clientmgr_metrics_;
+ shared_ptr<heron::common::MultiCountMetric> stmgr_clientmgr_metrics_;
sp_int64 high_watermark_;
sp_int64 low_watermark_;
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index a6ac8bd..c1cf2bc 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -35,6 +35,8 @@
namespace heron {
namespace stmgr {
+using std::make_shared;
+
// The scope the metrics in this file are under
const sp_string SERVER_SCOPE = "__server/";
// Num data tuples received from other stream managers
@@ -66,16 +68,16 @@
// The metrics need to be registered one by one here because the "__server" scope
// is already registered in heron::stmgr::InstanceServer. Duplicated registrations
// will only have one successfully registered.
- tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ tuples_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS,
tuples_from_stmgrs_metrics_);
- ack_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ ack_tuples_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS,
ack_tuples_from_stmgrs_metrics_);
- fail_tuples_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ fail_tuples_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS,
fail_tuples_from_stmgrs_metrics_);
- bytes_from_stmgrs_metrics_ = new heron::common::CountMetric();
+ bytes_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
bytes_from_stmgrs_metrics_);
}
@@ -83,13 +85,9 @@
StMgrServer::~StMgrServer() {
Stop();
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_DATA_TUPLES_FROM_STMGRS);
- delete tuples_from_stmgrs_metrics_;
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
- delete ack_tuples_from_stmgrs_metrics_;
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
- delete fail_tuples_from_stmgrs_metrics_;
metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
- delete bytes_from_stmgrs_metrics_;
}
void StMgrServer::HandleNewConnection(Connection* _conn) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 4ba28f4..2ae0be6 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -39,6 +39,8 @@
namespace heron {
namespace stmgr {
+using std::shared_ptr;
+
class StMgr;
class StMgrServer : public Server {
@@ -99,10 +101,10 @@
// Metrics
heron::common::MetricsMgrSt* metrics_manager_client_;
- heron::common::CountMetric* tuples_from_stmgrs_metrics_;
- heron::common::CountMetric* ack_tuples_from_stmgrs_metrics_;
- heron::common::CountMetric* fail_tuples_from_stmgrs_metrics_;
- heron::common::CountMetric* bytes_from_stmgrs_metrics_;
+ shared_ptr<heron::common::CountMetric> tuples_from_stmgrs_metrics_;
+ shared_ptr<heron::common::CountMetric> ack_tuples_from_stmgrs_metrics_;
+ shared_ptr<heron::common::CountMetric> fail_tuples_from_stmgrs_metrics_;
+ shared_ptr<heron::common::CountMetric> bytes_from_stmgrs_metrics_;
};
} // namespace stmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 3d8fa28..56cf760 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -52,6 +52,8 @@
namespace heron {
namespace stmgr {
+using std::make_shared;
+
// Stats for the process
const sp_string METRIC_CPU_USER = "__cpu_user_usec";
const sp_string METRIC_CPU_SYSTEM = "__cpu_system_usec";
@@ -118,17 +120,17 @@
state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zkhostport_, zkroot_, eventLoop_, false);
metrics_manager_client_ = new heron::common::MetricsMgrSt(
metricsmgr_port_, metrics_export_interval_sec, eventLoop_);
- stmgr_process_metrics_ = new heron::common::MultiAssignableMetric();
+ stmgr_process_metrics_ = make_shared<heron::common::MultiAssignableMetric>();
metrics_manager_client_->register_metric(METRIC_PROCESS, stmgr_process_metrics_);
- restore_initiated_metrics_ = new heron::common::CountMetric();
+ restore_initiated_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(METRIC_RESTORE_INITIALIZED, restore_initiated_metrics_);
- dropped_during_restore_metrics_ = new heron::common::MultiCountMetric();
+ dropped_during_restore_metrics_ = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric(METRIC_DROPPED_DURING_RESTORE,
dropped_during_restore_metrics_);
- instance_bytes_received_metrics_ = new heron::common::MultiCountMetric();
+ instance_bytes_received_metrics_ = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric(METRIC_INSTANCE_BYTES_RECEIVED,
instance_bytes_received_metrics_);
- back_pressure_metric_initiated_ = new heron::common::TimeSpentMetric();
+ back_pressure_metric_initiated_ = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT,
back_pressure_metric_initiated_);
state_mgr_->SetTMasterLocationWatch(topology_name_, [this]() { this->FetchTMasterLocation(); });
@@ -205,13 +207,7 @@
metrics_manager_client_->unregister_metric(METRIC_DROPPED_DURING_RESTORE);
metrics_manager_client_->unregister_metric(METRIC_INSTANCE_BYTES_RECEIVED);
metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT);
- delete stmgr_process_metrics_;
- delete restore_initiated_metrics_;
- delete dropped_during_restore_metrics_;
- delete instance_bytes_received_metrics_;
- delete back_pressure_metric_initiated_;
delete tuple_cache_;
- delete state_mgr_;
delete pplan_;
delete stmgr_server_;
delete instance_server_;
@@ -1050,11 +1046,10 @@
}
// save the checkpoint
- proto::ckptmgr::SaveInstanceStateRequest* message =
- new proto::ckptmgr::SaveInstanceStateRequest();
+ auto message = make_unique<proto::ckptmgr::SaveInstanceStateRequest>();
message->mutable_instance()->CopyFrom(_instance);
message->mutable_checkpoint()->CopyFrom(_message);
- ckptmgr_client_->SaveInstanceState(message);
+ ckptmgr_client_->SaveInstanceState(std::move(message));
}
// Invoked by CheckpointMgr Client when it finds out that the ckptmgr
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index abf84a6..885c379 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -47,6 +47,8 @@
namespace heron {
namespace stmgr {
+using std::shared_ptr;
+
class StMgrServer;
class InstanceServer;
class StMgrClientMgr;
@@ -199,7 +201,7 @@
static void PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
proto::api::Topology* _topology);
- heron::common::HeronStateMgr* state_mgr_;
+ shared_ptr<heron::common::HeronStateMgr> state_mgr_;
proto::system::PhysicalPlan* pplan_;
sp_string topology_name_;
sp_string topology_id_;
@@ -241,17 +243,17 @@
CkptMgrClient* ckptmgr_client_;
// Process related metrics
- heron::common::MultiAssignableMetric* stmgr_process_metrics_;
+ shared_ptr<heron::common::MultiAssignableMetric> stmgr_process_metrics_;
// Stateful Restore metric
- heron::common::CountMetric* restore_initiated_metrics_;
- heron::common::MultiCountMetric* dropped_during_restore_metrics_;
+ shared_ptr<heron::common::CountMetric> restore_initiated_metrics_;
+ shared_ptr<heron::common::MultiCountMetric> dropped_during_restore_metrics_;
// Instance related metrics
- heron::common::MultiCountMetric* instance_bytes_received_metrics_;
+ shared_ptr<heron::common::MultiCountMetric> instance_bytes_received_metrics_;
// Backpressure relarted metrics
- heron::common::TimeSpentMetric* back_pressure_metric_initiated_;
+ shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_initiated_;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index 3046ba5..fab8cd8 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -66,9 +66,9 @@
reconnect_timer_cb = [this]() { this->OnReConnectTimer(); };
heartbeat_timer_cb = [this]() { this->OnHeartbeatTimer(); };
- InstallResponseHandler(new proto::tmaster::StMgrRegisterRequest(),
+ InstallResponseHandler(make_unique<proto::tmaster::StMgrRegisterRequest>(),
&TMasterClient::HandleRegisterResponse);
- InstallResponseHandler(new proto::tmaster::StMgrHeartbeatRequest(),
+ InstallResponseHandler(make_unique<proto::tmaster::StMgrHeartbeatRequest>(),
&TMasterClient::HandleHeartbeatResponse);
InstallMessageHandler(&TMasterClient::HandleNewAssignmentMessage);
InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointMessage);
@@ -234,7 +234,7 @@
}
void TMasterClient::SendRegisterRequest() {
- auto request = new proto::tmaster::StMgrRegisterRequest();
+ auto request = make_unique<proto::tmaster::StMgrRegisterRequest>();
sp_string cwd;
FileUtils::getCwd(cwd);
@@ -251,7 +251,7 @@
request->add_instances()->CopyFrom(*(*iter));
}
- SendRequest(request, nullptr);
+ SendRequest(std::move(request), nullptr);
return;
}
@@ -268,11 +268,11 @@
}
void TMasterClient::SendHeartbeatRequest() {
- auto request = new proto::tmaster::StMgrHeartbeatRequest();
+ auto request = make_unique<proto::tmaster::StMgrHeartbeatRequest>();
request->set_heartbeat_time(time(nullptr));
// TODO(vikasr) Send actual stats
request->mutable_stats();
- SendRequest(request, nullptr);
+ SendRequest(std::move(request), nullptr);
return;
}
diff --git a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
index 3acf5fc..ffbaf59 100644
--- a/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/checkpoint-gateway_unittest.cpp
@@ -137,10 +137,10 @@
return ostr.str();
}
-heron::proto::system::Instance* CreateInstance(int32_t _comp, int32_t _comp_instance,
+unique_ptr<heron::proto::system::Instance> CreateInstance(int32_t _comp, int32_t _comp_instance,
int32_t _stmgr_id,
int32_t _global_index, bool _is_spout) {
- heron::proto::system::Instance* imap = new heron::proto::system::Instance();
+ auto imap = make_unique<heron::proto::system::Instance>();
imap->set_instance_id(CreateInstanceId(_global_index));
imap->set_stmgr_id(GenerateStMgrId(_stmgr_id));
heron::proto::system::InstanceInfo* inst = imap->mutable_info();
@@ -178,24 +178,20 @@
int32_t global_index = 1;
for (int spout = 0; spout < nSpouts; ++spout) {
for (int spout_instance = 0; spout_instance < nSpoutInstances; ++spout_instance) {
- heron::proto::system::Instance* instance =
- CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
+ auto instance = CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
if (++stmgr_assignment >= nContainers) {
stmgr_assignment = 0;
}
pplan->add_instances()->CopyFrom(*instance);
- delete instance;
}
}
for (int bolt = 0; bolt < nBolts; ++bolt) {
for (int bolt_instance = 0; bolt_instance < nBoltInstances; ++bolt_instance) {
- heron::proto::system::Instance* instance =
- CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
+ auto instance = CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
if (++stmgr_assignment >= nContainers) {
stmgr_assignment = 0;
}
pplan->add_instances()->CopyFrom(*instance);
- delete instance;
}
}
return pplan;
diff --git a/heron/stmgr/tests/cpp/server/dummy_instance.cpp b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
index 926c693..31fbec6 100644
--- a/heron/stmgr/tests/cpp/server/dummy_instance.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_instance.cpp
@@ -43,7 +43,7 @@
stmgr_id_(_stmgr_id),
recvd_stmgr_pplan_(NULL),
register_response_status(heron::proto::system::STMGR_DIDNT_REGISTER) {
- InstallResponseHandler(new heron::proto::stmgr::RegisterInstanceRequest(),
+ InstallResponseHandler(make_unique<heron::proto::stmgr::RegisterInstanceRequest>(),
&DummyInstance::HandleInstanceResponse);
InstallMessageHandler(&DummyInstance::HandleTupleMessage);
InstallMessageHandler(&DummyInstance::HandleNewInstanceAssignmentMsg);
@@ -95,7 +95,7 @@
heron::proto::stmgr::NewInstanceAssignmentMessage*) {}
void DummyInstance::CreateAndSendInstanceRequest() {
- auto request = new heron::proto::stmgr::RegisterInstanceRequest();
+ auto request = make_unique<heron::proto::stmgr::RegisterInstanceRequest>();
heron::proto::system::Instance* instance = request->mutable_instance();
instance->set_instance_id(instance_id_);
instance->set_stmgr_id(stmgr_id_);
@@ -104,7 +104,7 @@
instance->mutable_info()->set_component_name(component_name_);
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
- SendRequest(request, nullptr);
+ SendRequest(std::move(request), nullptr);
return;
}
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
index 372e3de..2e18c57 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.cpp
@@ -26,18 +26,20 @@
#include "network/network.h"
#include "server/dummy_stmgr.h"
+using std::shared_ptr;
+
///////////////////////////// DummyTMasterClient ///////////////////////////////////////////
DummyTMasterClient::DummyTMasterClient(
EventLoopImpl* eventLoop, const NetworkOptions& _options, const sp_string& stmgr_id,
const sp_string& stmgr_host, sp_int32 stmgr_port, sp_int32 shell_port,
- const std::vector<heron::proto::system::Instance*>& _instances)
+ const std::vector<shared_ptr<heron::proto::system::Instance>>& _instances)
: Client(eventLoop, _options),
stmgr_id_(stmgr_id),
stmgr_host_(stmgr_host),
stmgr_port_(stmgr_port),
shell_port_(shell_port),
instances_(_instances) {
- InstallResponseHandler(new heron::proto::tmaster::StMgrRegisterRequest(),
+ InstallResponseHandler(make_unique<heron::proto::tmaster::StMgrRegisterRequest>(),
&DummyTMasterClient::HandleRegisterResponse);
// Setup the call back function to be invoked when retrying
retry_cb_ = [this]() { this->Retry(); };
@@ -62,8 +64,7 @@
void DummyTMasterClient::HandleClose(NetworkErrorCode) {}
void DummyTMasterClient::CreateAndSendRegisterRequest() {
- heron::proto::tmaster::StMgrRegisterRequest* request =
- new heron::proto::tmaster::StMgrRegisterRequest();
+ auto request = make_unique<heron::proto::tmaster::StMgrRegisterRequest>();
heron::proto::system::StMgr* stmgr = request->mutable_stmgr();
sp_string cwd;
stmgr->set_id(stmgr_id_);
@@ -76,14 +77,14 @@
for (auto iter = instances_.begin(); iter != instances_.end(); ++iter) {
request->add_instances()->CopyFrom(**iter);
}
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
}
///////////////////////////// DummyStMgr /////////////////////////////////////////////////
DummyStMgr::DummyStMgr(EventLoopImpl* ss, const NetworkOptions& options, const sp_string& stmgr_id,
const sp_string& stmgr_host, sp_int32 stmgr_port,
const sp_string& tmaster_host, sp_int32 tmaster_port, sp_int32 shell_port,
- const std::vector<heron::proto::system::Instance*>& _instances)
+ const std::vector<shared_ptr<heron::proto::system::Instance>>& _instances)
: Server(ss, options), num_start_bp_(0), num_stop_bp_(0) {
NetworkOptions tmaster_options;
tmaster_options.set_host(tmaster_host);
diff --git a/heron/stmgr/tests/cpp/server/dummy_stmgr.h b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
index 73a73d5..cf30ca3 100644
--- a/heron/stmgr/tests/cpp/server/dummy_stmgr.h
+++ b/heron/stmgr/tests/cpp/server/dummy_stmgr.h
@@ -28,7 +28,7 @@
DummyTMasterClient(EventLoopImpl* eventLoop, const NetworkOptions& _options,
const sp_string& stmgr_id, const sp_string& stmgr_host, sp_int32 stmgr_port,
sp_int32 shell_port,
- const std::vector<heron::proto::system::Instance*>& instances);
+ const std::vector<std::shared_ptr<heron::proto::system::Instance>>& instances);
virtual ~DummyTMasterClient();
void setStmgrPort(sp_int32 stmgrPort) {
@@ -52,7 +52,7 @@
sp_string stmgr_host_;
sp_int32 stmgr_port_;
sp_int32 shell_port_;
- std::vector<heron::proto::system::Instance*> instances_;
+ std::vector<std::shared_ptr<heron::proto::system::Instance>> instances_;
};
class DummyStMgr : public Server {
@@ -60,7 +60,7 @@
DummyStMgr(EventLoopImpl* ss, const NetworkOptions& options, const sp_string& stmgr_id,
const sp_string& stmgr_host, sp_int32 stmgr_port, const sp_string& tmaster_host,
sp_int32 tmaster_port, sp_int32 shell_port,
- const std::vector<heron::proto::system::Instance*>& instances);
+ const std::vector<std::shared_ptr<heron::proto::system::Instance>>& instances);
virtual ~DummyStMgr();
sp_int32 Start();
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index 0bb0c1a..f75f666 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -57,6 +57,8 @@
sp_string metrics_sinks_config_filename =
"heron/config/src/yaml/conf/test/test_metrics_sinks.yaml";
+using std::shared_ptr;
+
// Generate a dummy topology
static heron::proto::api::Topology* GenerateDummyTopology(
const sp_string& topology_name, const sp_string& topology_id, int num_spouts,
@@ -223,9 +225,9 @@
return instanceid_stream.str();
}
-heron::proto::system::Instance* CreateInstanceMap(sp_int8 type, sp_int8 instance, sp_int32 stmgr_id,
- sp_int32 global_index, bool spout) {
- heron::proto::system::Instance* imap = new heron::proto::system::Instance();
+std::shared_ptr<heron::proto::system::Instance> CreateInstanceMap(sp_int8 type, sp_int8 instance,
+ sp_int32 stmgr_id, sp_int32 global_index, bool spout) {
+ auto imap = std::make_shared<heron::proto::system::Instance>();
imap->set_instance_id(CreateInstanceId(type, instance, spout));
imap->set_stmgr_id(STMGR_NAME + "-" + std::to_string(stmgr_id));
@@ -299,7 +301,7 @@
void StartDummyStMgr(EventLoopImpl*& ss, DummyStMgr*& mgr, std::thread*& stmgr_thread,
sp_int32& stmgr_port, sp_int32 tmaster_port, sp_int32 shell_port,
const sp_string& stmgr_id,
- const std::vector<heron::proto::system::Instance*>& instances) {
+ const std::vector<shared_ptr<heron::proto::system::Instance>>& instances) {
// Create the select server for this stmgr to use
ss = new EventLoopImpl();
@@ -435,10 +437,10 @@
std::map<sp_int32, std::vector<sp_string> > stmgr_instance_id_list_;
// Stmgr to Instance
- std::map<sp_int32, std::vector<heron::proto::system::Instance*> > stmgr_instance_list_;
+ std::map<sp_int32, std::vector<shared_ptr<heron::proto::system::Instance>>> stmgr_instance_list_;
// Instanceid to instance
- std::map<sp_string, heron::proto::system::Instance*> instanceid_instance_;
+ std::map<sp_string, shared_ptr<heron::proto::system::Instance>> instanceid_instance_;
std::map<sp_string, sp_int32> instanceid_stmgr_;
@@ -509,7 +511,7 @@
for (size_t spout = 0; spout < common.num_spouts_; ++spout) {
for (size_t spout_instance = 0; spout_instance < common.num_spout_instances_;
++spout_instance) {
- heron::proto::system::Instance* imap =
+ auto imap =
CreateInstanceMap(spout, spout_instance, stmgr_assignment_round, global_index++, true);
common.stmgr_instance_id_list_[stmgr_assignment_round].push_back(imap->instance_id());
common.stmgr_instance_list_[stmgr_assignment_round].push_back(imap);
@@ -527,7 +529,7 @@
// Distribute the bolts
for (size_t bolt = 0; bolt < common.num_bolts_; ++bolt) {
for (size_t bolt_instance = 0; bolt_instance < common.num_bolt_instances_; ++bolt_instance) {
- heron::proto::system::Instance* imap =
+ auto imap =
CreateInstanceMap(bolt, bolt_instance, stmgr_assignment_round, global_index++, false);
// Have we completed a round of distribution of components
common.stmgr_instance_id_list_[stmgr_assignment_round].push_back(imap->instance_id());
@@ -679,7 +681,7 @@
for (auto itr = common.instanceid_instance_.begin();
itr != common.instanceid_instance_.end(); ++itr)
- delete itr->second;
+ common.instanceid_instance_.erase(itr->first);
// Clean up the local filesystem state
FileUtils::removeRecursive(common.dpath_, true);
diff --git a/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp b/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp
index 405b685..5b82aa2 100644
--- a/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp
+++ b/heron/stmgr/tests/cpp/util/neighbour_calculator_unittest.cpp
@@ -47,6 +47,8 @@
const sp_string STMGR_NAME = "stmgr";
const sp_string LOCALHOST = "127.0.0.1";
+using std::unique_ptr;
+
// Generate a dummy topology
static heron::proto::api::Topology* GenerateDummyTopology(
const sp_string& topology_name, const sp_string& topology_id, int num_spouts,
@@ -134,10 +136,10 @@
return ostr.str();
}
-heron::proto::system::Instance* CreateInstance(int32_t _comp, int32_t _comp_instance,
- int32_t _stmgr_id,
- int32_t _global_index, bool _is_spout) {
- heron::proto::system::Instance* imap = new heron::proto::system::Instance();
+unique_ptr<heron::proto::system::Instance> CreateInstance(int32_t _comp, int32_t _comp_instance,
+ int32_t _stmgr_id,
+ int32_t _global_index, bool _is_spout) {
+ auto imap = make_unique<heron::proto::system::Instance>();
imap->set_instance_id(CreateInstanceId(_global_index));
imap->set_stmgr_id(GenerateStMgrId(_stmgr_id));
heron::proto::system::InstanceInfo* inst = imap->mutable_info();
@@ -175,24 +177,20 @@
int32_t global_index = 1;
for (int spout = 0; spout < nSpouts; ++spout) {
for (int spout_instance = 0; spout_instance < nSpoutInstances; ++spout_instance) {
- heron::proto::system::Instance* instance =
- CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
+ auto instance = CreateInstance(spout, spout_instance, stmgr_assignment, global_index++, true);
if (++stmgr_assignment >= nContainers) {
stmgr_assignment = 0;
}
pplan->add_instances()->CopyFrom(*instance);
- delete instance;
}
}
for (int bolt = 0; bolt < nBolts; ++bolt) {
for (int bolt_instance = 0; bolt_instance < nBoltInstances; ++bolt_instance) {
- heron::proto::system::Instance* instance =
- CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
+ auto instance = CreateInstance(bolt, bolt_instance, stmgr_assignment, global_index++, false);
if (++stmgr_assignment >= nContainers) {
stmgr_assignment = 0;
}
pplan->add_instances()->CopyFrom(*instance);
- delete instance;
}
}
return pplan;
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
index b348833..767c649 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
@@ -38,9 +38,11 @@
// TODO(sanjeev): Take this from config
reconnect_ckptmgr_interval_sec_ = 10;
- InstallResponseHandler(new proto::ckptmgr::RegisterTMasterRequest(),
+
+
+ InstallResponseHandler(make_unique<proto::ckptmgr::RegisterTMasterRequest>(),
&CkptMgrClient::HandleTMasterRegisterResponse);
- InstallResponseHandler(new proto::ckptmgr::CleanStatefulCheckpointRequest(),
+ InstallResponseHandler(make_unique<proto::ckptmgr::CleanStatefulCheckpointRequest>(),
&CkptMgrClient::HandleCleanStatefulCheckpointResponse);
}
@@ -69,7 +71,6 @@
<< " due to: " << _status << std::endl;
if (quit_) {
LOG(ERROR) << "Quitting" << std::endl;
- delete this;
} else {
LOG(INFO) << "Retrying again..." << std::endl;
AddTimer([this]() { this->OnReconnectTimer(); },
@@ -89,7 +90,7 @@
<< " closed connection with code: " << _status << std::endl;
}
if (quit_) {
- delete this;
+ LOG(ERROR) << "Quitting" << std::endl;
} else {
LOG(INFO) << "Will try to reconnect again..." << std::endl;
AddTimer([this]() { this->OnReconnectTimer(); },
@@ -126,20 +127,20 @@
void CkptMgrClient::SendRegisterRequest() {
LOG(INFO) << "Sending RegisterTmasterRequest to ckptmgr" << std::endl;
- auto request = new proto::ckptmgr::RegisterTMasterRequest();
+ auto request = make_unique<proto::ckptmgr::RegisterTMasterRequest>();
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
}
void CkptMgrClient::SendCleanStatefulCheckpointRequest(const std::string& _oldest_ckpt,
bool _clean_all) {
LOG(INFO) << "Sending CleanStatefulCheckpoint request to ckptmgr with oldest checkpoint "
<< _oldest_ckpt << " and clean_all " << _clean_all << std::endl;
- auto request = new proto::ckptmgr::CleanStatefulCheckpointRequest();
+ auto request = make_unique<proto::ckptmgr::CleanStatefulCheckpointRequest>();
request->set_oldest_checkpoint_preserved(_oldest_ckpt);
request->set_clean_all_checkpoints(_clean_all);
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
}
void CkptMgrClient::HandleCleanStatefulCheckpointResponse(void*,
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
index 7cbc77d..31dcf7b 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
@@ -34,6 +34,8 @@
namespace heron {
namespace tmaster {
+using std::make_shared;
+
const sp_string METRIC_RESTORE_START = "__restore_start";
const sp_string METRIC_RESTORE_STMGR_RESPONSE = "__restore_stmgr_response";
const sp_string METRIC_RESTORE_STMGR_RESPONSE_IGNORED = "__restore_stmgr_response_ignored";
@@ -49,26 +51,22 @@
const int32_t MOST_CHECKPOINTS_NUMBER = 5;
StatefulController::StatefulController(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
- heron::common::HeronStateMgr* _state_mgr,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+ shared_ptr<heron::common::HeronStateMgr> _state_mgr,
std::chrono::high_resolution_clock::time_point _tmaster_start_time,
- common::MetricsMgrSt* _metrics_manager_client,
+ shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
std::function<void(std::string)> _ckpt_save_watcher)
- : topology_name_(_topology_name), ckpt_record_(_ckpt), state_mgr_(_state_mgr),
+ : topology_name_(_topology_name), ckpt_record_(std::move(_ckpt)), state_mgr_(_state_mgr),
metrics_manager_client_(_metrics_manager_client) {
- checkpointer_ = new StatefulCheckpointer(_tmaster_start_time);
- restorer_ = new StatefulRestorer();
- count_metrics_ = new common::MultiCountMetric();
+ checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time);
+ restorer_ = make_unique<StatefulRestorer>();
+ count_metrics_ = make_shared<common::MultiCountMetric>();
metrics_manager_client_->register_metric("__stateful_controller", count_metrics_);
ckpt_save_watcher_ = _ckpt_save_watcher;
}
StatefulController::~StatefulController() {
- delete ckpt_record_;
- delete checkpointer_;
- delete restorer_;
metrics_manager_client_->unregister_metric("__stateful_controller");
- delete count_metrics_;
}
void StatefulController::StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_state) {
@@ -139,32 +137,34 @@
}
void StatefulController::HandleInstanceStateStored(const std::string& _checkpoint_id,
- const std::string& _packing_plan_id,
- const proto::system::Instance& _instance) {
+ const std::string& _packing_plan_id,
+ const proto::system::Instance& _instance) {
count_metrics_->scope(METRIC_INSTANCE_CKPT_SAVED)->incr();
+
if (restorer_->IsInProgress()) {
LOG(INFO) << "Ignoring the Instance State because we are in Restore";
count_metrics_->scope(METRIC_INSTANCE_CKPT_SAVED_IGNORED)->incr();
return;
}
+
if (checkpointer_->HandleInstanceStateStored(_checkpoint_id, _instance)) {
// This is now a globally consistent checkpoint
count_metrics_->scope(METRIC_GLOBAL_CONSISTENT_CKPT)->incr();
auto new_ckpt_record = AddNewConsistentCheckpoint(_checkpoint_id, _packing_plan_id);
- state_mgr_->SetStatefulCheckpoints(topology_name_, *new_ckpt_record,
- std::bind(&StatefulController::HandleCheckpointSave, this,
- new_ckpt_record, std::placeholders::_1));
+
+ state_mgr_->SetStatefulCheckpoints(topology_name_, new_ckpt_record,
+ std::bind(&StatefulController::HandleCheckpointSave, this, new_ckpt_record,
+ std::placeholders::_1));
}
}
void StatefulController::HandleCheckpointSave(
- proto::ckptmgr::StatefulConsistentCheckpoints* _new_ckpt,
- proto::system::StatusCode _status) {
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
+ proto::system::StatusCode _status) {
if (_status == proto::system::OK) {
LOG(INFO) << "Successfully saved " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
<< " as the new globally consistent checkpoint";
- delete ckpt_record_;
- ckpt_record_ = _new_ckpt;
+ ckpt_record_ = std::move(_new_ckpt);
std::string oldest_ckpt =
ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size() - 1)
.checkpoint_id();
@@ -173,7 +173,6 @@
LOG(ERROR) << "Error saving " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
<< " as the new globally consistent checkpoint "
<< _status;
- delete _new_ckpt;
}
}
@@ -194,10 +193,10 @@
return EMPTY_STRING;
}
-proto::ckptmgr::StatefulConsistentCheckpoints*
+shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints>
StatefulController::AddNewConsistentCheckpoint(const std::string& _new_checkpoint,
const std::string& _packing_plan) {
- auto new_record = new proto::ckptmgr::StatefulConsistentCheckpoints();
+ auto new_record = make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
auto new_consistent_checkpoint = new_record->add_consistent_checkpoints();
new_consistent_checkpoint->set_checkpoint_id(_new_checkpoint);
new_consistent_checkpoint->set_packing_plan_id(_packing_plan);
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h b/heron/tmaster/src/cpp/manager/stateful-controller.h
index 488bbbd..6ceda37 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.h
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.h
@@ -36,6 +36,9 @@
namespace heron {
namespace tmaster {
+using std::unique_ptr;
+using std::shared_ptr;
+
class StatefulRestorer;
class StatefulCheckpointer;
@@ -51,10 +54,10 @@
class StatefulController {
public:
explicit StatefulController(const std::string& _topology_name,
- proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
- heron::common::HeronStateMgr* _state_mgr,
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+ shared_ptr<heron::common::HeronStateMgr> _state_mgr,
std::chrono::high_resolution_clock::time_point _tmaster_start_time,
- common::MetricsMgrSt* _metrics_manager_client,
+ shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
std::function<void(std::string)> _ckpt_save_watcher);
virtual ~StatefulController();
// Start a new restore process
@@ -85,20 +88,20 @@
// Get the youngest ckpt id that is older than the given ckpt_id
const std::string& GetNextInLineCheckpointId(const std::string& _ckpt_id);
// Creates a new ckpt record adding the latest one
- proto::ckptmgr::StatefulConsistentCheckpoints*
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints>
AddNewConsistentCheckpoint(const std::string& _new_checkpoint,
const std::string& _packing_plan);
// Handler when statemgr saves the new checkpoint record
- void HandleCheckpointSave(proto::ckptmgr::StatefulConsistentCheckpoints* _new_ckpt,
+ void HandleCheckpointSave(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
proto::system::StatusCode _status);
std::string topology_name_;
- proto::ckptmgr::StatefulConsistentCheckpoints* ckpt_record_;
- heron::common::HeronStateMgr* state_mgr_;
- StatefulCheckpointer* checkpointer_;
- StatefulRestorer* restorer_;
- common::MetricsMgrSt* metrics_manager_client_;
- common::MultiCountMetric* count_metrics_;
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> ckpt_record_;
+ shared_ptr<heron::common::HeronStateMgr> state_mgr_;
+ unique_ptr<StatefulCheckpointer> checkpointer_;
+ unique_ptr<StatefulRestorer> restorer_;
+ shared_ptr<common::MetricsMgrSt> metrics_manager_client_;
+ shared_ptr<common::MultiCountMetric> count_metrics_;
std::function<void(std::string)> ckpt_save_watcher_;
};
} // namespace tmaster
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.cpp b/heron/tmaster/src/cpp/manager/stats-interface.cpp
index 4b2086d..e1f8aa9 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.cpp
+++ b/heron/tmaster/src/cpp/manager/stats-interface.cpp
@@ -33,9 +33,9 @@
namespace tmaster {
StatsInterface::StatsInterface(EventLoop* eventLoop, const NetworkOptions& _options,
- TMetricsCollector* _collector, TMaster* _tmaster)
+ shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster)
: metrics_collector_(_collector), tmaster_(_tmaster) {
- http_server_ = new HTTPServer(eventLoop, _options);
+ http_server_ = make_unique<HTTPServer>(eventLoop, _options);
// Install the handlers
auto cbHandleStats = [this](IncomingHTTPRequest* request) { this->HandleStatsRequest(request); };
@@ -64,7 +64,7 @@
CHECK(http_server_->Start() == SP_OK);
}
-StatsInterface::~StatsInterface() { delete http_server_; }
+StatsInterface::~StatsInterface() {}
void StatsInterface::HandleStatsRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Got a stats request " << _request->GetQuery();
@@ -77,16 +77,14 @@
delete _request;
return;
}
- proto::tmaster::MetricResponse* res =
- metrics_collector_->GetMetrics(req, tmaster_->getInitialTopology());
+ auto res = metrics_collector_->GetMetrics(req, tmaster_->getInitialTopology());
sp_string response_string;
CHECK(res->SerializeToString(&response_string));
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(_request);
+ auto response = make_unique<OutgoingHTTPResponse>(_request);
response->AddHeader("Content-Type", "application/octet-stream");
response->AddHeader("Content-Length", std::to_string(response_string.size()));
response->AddResponse(response_string);
- http_server_->SendReply(_request, 200, response);
- delete res;
+ http_server_->SendReply(_request, 200, std::move(response));
delete _request;
LOG(INFO) << "Done with stats request ";
}
@@ -102,16 +100,15 @@
delete _request;
return;
}
- heron::proto::tmaster::ExceptionLogResponse* exception_response =
+ unique_ptr<heron::proto::tmaster::ExceptionLogResponse> exception_response =
metrics_collector_->GetExceptions(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
- OutgoingHTTPResponse* http_response = new OutgoingHTTPResponse(_request);
+ auto http_response = make_unique<OutgoingHTTPResponse>(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
- http_server_->SendReply(_request, 200, http_response);
- delete exception_response;
+ http_server_->SendReply(_request, 200, std::move(http_response));
delete _request;
LOG(INFO) << "Returned exceptions response";
}
@@ -126,16 +123,14 @@
delete _request;
return;
}
- heron::proto::tmaster::ExceptionLogResponse* exception_response =
- metrics_collector_->GetExceptionsSummary(exception_request);
+ auto exception_response = metrics_collector_->GetExceptionsSummary(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
- OutgoingHTTPResponse* http_response = new OutgoingHTTPResponse(_request);
+ auto http_response = make_unique<OutgoingHTTPResponse>(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
- http_server_->SendReply(_request, 200, http_response);
- delete exception_response;
+ http_server_->SendReply(_request, 200, std::move(http_response));
delete _request;
LOG(INFO) << "Returned exceptions response";
}
@@ -155,11 +150,11 @@
auto stmgrs_reg_summary_response = tmaster_->GetStmgrsRegSummary();
sp_string response_string;
CHECK(stmgrs_reg_summary_response->SerializeToString(&response_string));
- auto http_response = new OutgoingHTTPResponse(_request);
+ auto http_response = make_unique<OutgoingHTTPResponse>(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
- http_server_->SendReply(_request, 200, http_response);
+ http_server_->SendReply(_request, 200, std::move(http_response));
delete _request;
LOG(INFO) << "Returned stream managers registration summary response";
}
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.h b/heron/tmaster/src/cpp/manager/stats-interface.h
index 58916df..a6fd190 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.h
+++ b/heron/tmaster/src/cpp/manager/stats-interface.h
@@ -27,13 +27,16 @@
namespace heron {
namespace tmaster {
+using std::unique_ptr;
+using std::shared_ptr;
+
class TMetricsCollector;
class TMaster;
class StatsInterface {
public:
StatsInterface(EventLoop* eventLoop, const NetworkOptions& options,
- TMetricsCollector* _collector, TMaster* tmaster);
+ shared_ptr<TMetricsCollector> _collector, TMaster* tmaster);
virtual ~StatsInterface();
private:
@@ -43,8 +46,8 @@
void HandleExceptionSummaryRequest(IncomingHTTPRequest* _request);
void HandleStmgrsRegistrationSummaryRequest(IncomingHTTPRequest* _request);
- HTTPServer* http_server_; // Our http server
- TMetricsCollector* metrics_collector_;
+ unique_ptr<HTTPServer> http_server_; // Our http server
+ shared_ptr<TMetricsCollector> metrics_collector_;
TMaster* tmaster_;
};
} // namespace tmaster
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
index fca3cfe..46be97d 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
@@ -33,34 +33,25 @@
namespace tmaster {
StMgrState::StMgrState(Connection* _conn, const proto::system::StMgr& _stmgr,
- const std::vector<proto::system::Instance*>& _instances,
- Server* _server) {
+ const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+ Server& _server) : server_(_server) {
last_heartbeat_ = time(NULL);
last_stats_ = NULL;
instances_ = _instances;
- stmgr_ = new proto::system::StMgr(_stmgr);
+ stmgr_ = std::make_shared<proto::system::StMgr>(_stmgr);
connection_ = _conn;
- server_ = _server;
}
StMgrState::~StMgrState() {
- delete stmgr_;
- for (size_t i = 0; i < instances_.size(); ++i) {
- delete instances_[i];
- }
delete last_stats_;
}
void StMgrState::UpdateWithNewStMgr(const proto::system::StMgr& _stmgr,
- const std::vector<proto::system::Instance*>& _instances,
- Connection* _conn) {
+ const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+ Connection* _conn) {
delete last_stats_;
last_stats_ = NULL;
- delete stmgr_;
- for (size_t i = 0; i < instances_.size(); ++i) {
- delete instances_[i];
- }
- stmgr_ = new proto::system::StMgr(_stmgr);
+ stmgr_ = std::make_shared<proto::system::StMgr>(_stmgr);
instances_ = _instances;
connection_ = _conn;
}
@@ -97,7 +88,7 @@
const proto::ckptmgr::RestoreTopologyStateRequest& _message) {
LOG(INFO) << "Sending restore topology state message to stmgr " << stmgr_->id()
<< " with checkpoint " << _message.checkpoint_id();
- server_->SendMessage(connection_, _message);
+ server_.SendMessage(connection_, _message);
}
void StMgrState::SendStartStatefulProcessingMessage(const std::string& _checkpoint_id) {
@@ -105,19 +96,19 @@
<< " with checkpoint " << _checkpoint_id;
proto::ckptmgr::StartStmgrStatefulProcessing message;
message.set_checkpoint_id(_checkpoint_id);
- server_->SendMessage(connection_, message);
+ server_.SendMessage(connection_, message);
}
void StMgrState::NewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) {
LOG(INFO) << "Sending a new physical plan to stmgr " << stmgr_->id();
proto::stmgr::NewPhysicalPlanMessage message;
message.mutable_new_pplan()->CopyFrom(_pplan);
- server_->SendMessage(connection_, message);
+ server_.SendMessage(connection_, message);
}
void StMgrState::NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request) {
LOG(INFO) << "Sending a new stateful checkpoint request to stmgr" << stmgr_->id();
- server_->SendMessage(connection_, _request);
+ server_.SendMessage(connection_, _request);
}
/*
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h b/heron/tmaster/src/cpp/manager/stmgrstate.h
index ebfdde4..270d93a 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.h
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.h
@@ -40,16 +40,18 @@
namespace heron {
namespace tmaster {
+using std::shared_ptr;
+
class TMasterServer;
class StMgrState {
public:
StMgrState(Connection* _conn, const proto::system::StMgr& _info,
- const std::vector<proto::system::Instance*>& _instances, Server* _server);
+ const std::vector<shared_ptr<proto::system::Instance>>& _instances, Server& _server);
virtual ~StMgrState();
void UpdateWithNewStMgr(const proto::system::StMgr& _info,
- const std::vector<proto::system::Instance*>& _instances,
+ const std::vector<shared_ptr<proto::system::Instance>>& _instances,
Connection* _conn);
// Update the heartbeat. Note:- We own _stats now
@@ -75,8 +77,8 @@
Connection* get_connection() { return connection_; }
const std::string& get_id() const { return stmgr_->id(); }
sp_uint32 get_num_instances() const { return instances_.size(); }
- const std::vector<proto::system::Instance*>& get_instances() const { return instances_; }
- const proto::system::StMgr* get_stmgr() const { return stmgr_; }
+ const std::vector<shared_ptr<proto::system::Instance>>& get_instances() const {return instances_;}
+ const shared_ptr<proto::system::StMgr> get_stmgr() const { return stmgr_; }
bool VerifyInstances(const std::vector<proto::system::Instance*>& _instances);
private:
@@ -86,14 +88,14 @@
proto::system::StMgrStats* last_stats_;
// All the instances on this stmgr
- std::vector<proto::system::Instance*> instances_;
+ std::vector<shared_ptr<proto::system::Instance>> instances_;
// The info about this stmgr
- proto::system::StMgr* stmgr_;
+ shared_ptr<proto::system::StMgr> stmgr_;
// The connection used by the nodemanager to contact us
Connection* connection_;
// Our link to our TMaster
- Server* server_;
+ Server& server_;
};
} // namespace tmaster
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index b7a4ee6..6745f80 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -42,7 +42,7 @@
*/
TController::TController(EventLoop* eventLoop, const NetworkOptions& options, TMaster* tmaster)
: tmaster_(tmaster) {
- http_server_ = new HTTPServer(eventLoop, options);
+ http_server_ = make_unique<HTTPServer>(eventLoop, options);
/*
* Install the handlers
*/
@@ -74,7 +74,7 @@
http_server_->InstallCallBack("/get_current_physical_plan", std::move(cbGetCurPPlan));
}
-TController::~TController() { delete http_server_; }
+TController::~TController() {}
sp_int32 TController::Start() { return http_server_->Start(); }
@@ -111,9 +111,9 @@
} else {
std::string s = "Topology successfully activated";
LOG(INFO) << s;
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+ auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(s);
- http_server_->SendReply(request, 200, response);
+ http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
@@ -150,9 +150,9 @@
} else {
std::string s = "Topology successfully deactivated";
LOG(INFO) << s;
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+ auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(s);
- http_server_->SendReply(request, 200, response);
+ http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
@@ -197,9 +197,9 @@
} else {
std::string msg = "Checkpoints successfully cleaned";
LOG(INFO) << msg;
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+ auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(msg);
- http_server_->SendReply(request, 200, response);
+ http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
@@ -262,9 +262,9 @@
} else {
const std::string message("Runtime config updated");
LOG(INFO) << message;
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+ auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(message);
- http_server_->SendReply(request, 200, response);
+ http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
@@ -293,9 +293,9 @@
const std::string message("Get current physical plan");
LOG(INFO) << message;
- OutgoingHTTPResponse* response = new OutgoingHTTPResponse(request);
+ auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(pplanStringFixed);
- http_server_->SendReply(request, 200, response);
+ http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.h b/heron/tmaster/src/cpp/manager/tcontroller.h
index f7e7a48..a611b11 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.h
+++ b/heron/tmaster/src/cpp/manager/tcontroller.h
@@ -69,7 +69,7 @@
void HandleGetCurPPlanRequest(IncomingHTTPRequest* request);
// We are a http server
- HTTPServer* http_server_;
+ unique_ptr<HTTPServer> http_server_;
// our tmaster
TMaster* tmaster_;
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index 206d8fb..9ab6897 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -47,6 +47,9 @@
namespace heron {
namespace tmaster {
+using std::unique_ptr;
+using std::make_shared;
+
// Stats for the process
const sp_string METRIC_CPU_USER = "__cpu_user_usec";
const sp_string METRIC_CPU_SYSTEM = "__cpu_system_usec";
@@ -69,51 +72,49 @@
topdir_ = _topdir;
tmaster_controller_ = nullptr;
tmaster_controller_port_ = _tmaster_controller_port;
- master_ = NULL;
+ master_ = nullptr;
master_port_ = _master_port;
- stats_ = NULL;
+ stats_ = nullptr;
stats_port_ = _stats_port;
myhost_name_ = _myhost_name;
eventLoop_ = eventLoop;
dns_ = new AsyncDNS(eventLoop_);
http_client_ = new HTTPClient(eventLoop_, dns_);
- metrics_collector_ =
- new TMetricsCollector(config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterMetricsCollectorMaximumIntervalMin() *
- 60,
- eventLoop_, _metrics_sinks_yaml);
+ metrics_collector_ = make_shared<TMetricsCollector>(config::HeronInternalsConfigReader::Instance()
+ ->GetHeronTmasterMetricsCollectorMaximumIntervalMin() * 60,
+ eventLoop_, _metrics_sinks_yaml);
mMetricsMgrPort = metricsMgrPort;
sp_int32 metricsExportIntervalSec =
config::HeronInternalsConfigReader::Instance()->GetHeronMetricsExportIntervalSec();
- mMetricsMgrClient = new heron::common::MetricsMgrSt(
+ mMetricsMgrClient = make_shared<heron::common::MetricsMgrSt>(
mMetricsMgrPort, metricsExportIntervalSec, eventLoop_);
mMetricsMgrClient->Start(myhost_name_, master_port_, "__tmaster__",
"0"); // MM expects task_id, so just giving 0 for tmaster.
- tmasterProcessMetrics = new heron::common::MultiAssignableMetric();
+ tmasterProcessMetrics = make_shared<heron::common::MultiAssignableMetric>();
mMetricsMgrClient->register_metric(METRIC_PREFIX, tmasterProcessMetrics);
ckptmgr_port_ = _ckptmgr_port;
ckptmgr_client_ = nullptr;
- current_pplan_ = NULL;
+ current_pplan_ = nullptr;
// The topology as first submitted by the user
// It shall only be used to construct the physical plan when TMaster first time starts
// Any runtime changes shall be made to current_pplan_->topology
- topology_ = NULL;
- packing_plan_ = NULL;
+ topology_ = nullptr;
+ packing_plan_ = nullptr;
state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zk_hostport_, topdir_, eventLoop_);
assignment_in_progress_ = false;
do_reassign_ = false;
master_establish_attempts_ = 0;
- tmaster_location_ = new proto::tmaster::TMasterLocation();
+ tmaster_location_ = make_unique<proto::tmaster::TMasterLocation>();
tmaster_location_->set_topology_name(_topology_name);
tmaster_location_->set_topology_id(_topology_id);
tmaster_location_->set_host(myhost_name_);
@@ -154,7 +155,7 @@
}
void TMaster::FetchPackingPlan() {
- auto packing_plan = new proto::system::PackingPlan();
+ auto packing_plan = make_shared<proto::system::PackingPlan>();
state_mgr_->GetPackingPlan(tmaster_location_->topology_name(), packing_plan,
[packing_plan, this](proto::system::StatusCode status) {
@@ -162,7 +163,7 @@
});
}
-void TMaster::OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
+void TMaster::OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(INFO) << "PackingPlan Fetch failed with status " << _status;
@@ -230,28 +231,18 @@
}
TMaster::~TMaster() {
- delete topology_;
- delete packing_plan_;
- delete current_pplan_;
- delete state_mgr_;
- delete tmaster_controller_;
if (master_) {
master_->Stop();
}
- delete master_;
- delete stats_;
- delete tmaster_location_;
+
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
- delete iter->second;
+ stmgrs_.erase(iter->first);
}
+
stmgrs_.clear();
- delete metrics_collector_;
mMetricsMgrClient->unregister_metric(METRIC_PREFIX);
- delete mMetricsMgrClient;
- delete tmasterProcessMetrics;
- delete stateful_controller_;
- delete ckptmgr_client_;
+
delete http_client_;
delete dns_;
}
@@ -302,8 +293,8 @@
LOG(INFO) << "Successfully set ourselves as master\n";
// Lets now read the topology
- topology_ = new proto::api::Topology();
- state_mgr_->GetTopology(tmaster_location_->topology_name(), topology_,
+ topology_ = make_unique<proto::api::Topology>();
+ state_mgr_->GetTopology(tmaster_location_->topology_name(), *topology_,
[this](proto::system::StatusCode code) {
this->GetTopologyDone(code);
});
@@ -340,7 +331,7 @@
ckpt_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
1024 * 1024);
- ckptmgr_client_ = new CkptMgrClient(eventLoop_, ckpt_options,
+ ckptmgr_client_ = make_unique<CkptMgrClient>(eventLoop_, ckpt_options,
topology_->name(), topology_->id(),
std::bind(&TMaster::HandleCleanStatefulCheckpointResponse,
this, std::placeholders::_1));
@@ -348,7 +339,7 @@
ckptmgr_client_->Start();
// We also need to load latest checkpoint state
- auto ckpt = new proto::ckptmgr::StatefulConsistentCheckpoints();
+ auto ckpt = std::make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
auto cb = [ckpt, this](proto::system::StatusCode code) {
this->GetStatefulCheckpointsDone(ckpt, code);
};
@@ -360,8 +351,9 @@
}
}
-void TMaster::GetStatefulCheckpointsDone(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
- proto::system::StatusCode _code) {
+void TMaster::GetStatefulCheckpointsDone(
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
+ proto::system::StatusCode _code) {
if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
<< " Getting Stateful Checkpoint failed with error " << _code;
@@ -370,9 +362,8 @@
LOG(INFO) << "For topology " << tmaster_location_->topology_name()
<< " No existing globally consistent checkpoint found "
<< " inserting a empty one";
- delete _ckpt;
// We need to set an empty one
- auto ckpts = new proto::ckptmgr::StatefulConsistentCheckpoints;
+ auto ckpts = make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
auto ckpt = ckpts->add_consistent_checkpoints();
ckpt->set_checkpoint_id("");
ckpt->set_packing_plan_id(packing_plan_->id());
@@ -381,18 +372,18 @@
};
state_mgr_->CreateStatefulCheckpoints(tmaster_location_->topology_name(),
- *ckpts, std::move(cb));
+ ckpts, std::move(cb));
} else {
LOG(INFO) << "For topology " << tmaster_location_->topology_name()
<< " An existing globally consistent checkpoint found "
<< _ckpt->DebugString();
- SetupStatefulController(_ckpt);
+ SetupStatefulController(std::move(_ckpt));
FetchPhysicalPlan();
}
}
void TMaster::SetStatefulCheckpointsDone(proto::system::StatusCode _code,
- proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt) {
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
if (_code != proto::system::OK) {
LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
<< " Setting empty Stateful Checkpoint failed with error " << _code;
@@ -401,7 +392,8 @@
FetchPhysicalPlan();
}
-void TMaster::SetupStatefulController(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt) {
+void TMaster::SetupStatefulController(
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
sp_int64 stateful_checkpoint_interval =
config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300);
CHECK_GT(stateful_checkpoint_interval, 0);
@@ -410,8 +402,8 @@
this->HandleStatefulCheckpointSave(_oldest_ckptid);
};
// Instantiate the stateful restorer/coordinator
- stateful_controller_ = new StatefulController(topology_->name(), _ckpt, state_mgr_, start_time_,
- mMetricsMgrClient, cb);
+ stateful_controller_ = make_unique<StatefulController>(topology_->name(), _ckpt, state_mgr_,
+ start_time_, mMetricsMgrClient, cb);
LOG(INFO) << "Starting timer to checkpoint state every "
<< stateful_checkpoint_interval << " seconds";
CHECK_GT(eventLoop_->registerTimer(
@@ -449,7 +441,7 @@
}
void TMaster::FetchPhysicalPlan() {
- proto::system::PhysicalPlan* pplan = new proto::system::PhysicalPlan();
+ auto pplan = make_shared<proto::system::PhysicalPlan>();
auto cb = [pplan, this](proto::system::StatusCode code) {
this->GetPhysicalPlanDone(pplan, code);
};
@@ -489,7 +481,7 @@
}
}
-void TMaster::GetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan,
+void TMaster::GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code) {
// Physical plan need not exist. First check if some other error occurred.
if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
@@ -504,7 +496,6 @@
if (_code == proto::system::PATH_DOES_NOT_EXIST) {
LOG(ERROR) << "There was no existing physical plan\n";
// We never did assignment in the first place
- delete _pplan;
} else {
LOG(INFO) << "There was an existing physical plan\n";
CHECK_EQ(_code, proto::system::OK);
@@ -523,7 +514,7 @@
->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
1_MB);
master_options.set_socket_family(PF_INET);
- master_ = new TMasterServer(eventLoop_, master_options, metrics_collector_, this);
+ master_ = make_unique<TMasterServer>(eventLoop_, master_options, metrics_collector_, this);
sp_int32 retval = master_->Start();
if (retval != SP_OK) {
@@ -539,7 +530,7 @@
->GetHeronTmasterNetworkControllerOptionsMaximumPacketMb() *
1_MB);
controller_options.set_socket_family(PF_INET);
- tmaster_controller_ = new TController(eventLoop_, controller_options, this);
+ tmaster_controller_ = make_unique<TController>(eventLoop_, controller_options, this);
retval = tmaster_controller_->Start();
if (retval != SP_OK) {
@@ -559,7 +550,7 @@
->GetHeronTmasterNetworkStatsOptionsMaximumPacketMb() *
1_MB);
stats_options.set_socket_family(PF_INET);
- stats_ = new StatsInterface(eventLoop_, stats_options, metrics_collector_, this);
+ stats_ = make_unique<StatsInterface>(eventLoop_, stats_options, metrics_collector_, this);
}
void TMaster::ActivateTopology(VCallback<proto::system::StatusCode> cb) {
@@ -567,7 +558,7 @@
DCHECK(current_pplan_->topology().IsInitialized());
// Set the status
- proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+ auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->CopyFrom(*current_pplan_);
new_pplan->mutable_topology()->set_state(proto::api::RUNNING);
@@ -584,7 +575,7 @@
DCHECK(current_pplan_->topology().IsInitialized());
// Set the status
- proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+ auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->CopyFrom(*current_pplan_);
new_pplan->mutable_topology()->set_state(proto::api::PAUSED);
@@ -604,12 +595,11 @@
LogConfig(_config);
// Parse and set the new configs
- proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+ auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->CopyFrom(*current_pplan_);
proto::api::Topology* topology = new_pplan->mutable_topology();
if (!UpdateRuntimeConfigInTopology(topology, _config)) {
LOG(ERROR) << "Fail to update runtime config in topology";
- delete new_pplan;
return false;
}
@@ -686,8 +676,9 @@
}
proto::system::Status* TMaster::RegisterStMgr(
- const proto::system::StMgr& _stmgr, const std::vector<proto::system::Instance*>& _instances,
- Connection* _conn, proto::system::PhysicalPlan*& _pplan) {
+ const proto::system::StMgr& _stmgr,
+ const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+ Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan) {
const std::string& stmgr_id = _stmgr.id();
LOG(INFO) << "Got a register stream manager request from " << stmgr_id;
@@ -733,7 +724,7 @@
return status;
} else {
// This guy was indeed expected
- stmgrs_[stmgr_id] = new StMgrState(_conn, _stmgr, _instances, master_);
+ stmgrs_[stmgr_id] = make_shared<StMgrState>(_conn, _stmgr, _instances, *master_);
connection_to_stmgr_id_[_conn] = stmgr_id;
absent_stmgrs_.erase(stmgr_id);
}
@@ -749,6 +740,7 @@
CHECK_GE(eventLoop_->registerTimer(std::move(cb), false, 0), 0);
}
}
+
_pplan = current_pplan_;
proto::system::Status* status = new proto::system::Status();
status->set_status(proto::system::OK);
@@ -769,7 +761,7 @@
// TODO(kramasamy): If current_assignment exists, we need
// to use as many portions from it as possible
- proto::system::PhysicalPlan* pplan = MakePhysicalPlan();
+ shared_ptr<proto::system::PhysicalPlan> pplan = MakePhysicalPlan();
CHECK_NOTNULL(pplan);
DCHECK(pplan->IsInitialized());
@@ -795,7 +787,7 @@
}
}
-void TMaster::SetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan,
+void TMaster::SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
LOG(ERROR) << "Error writing assignment to statemgr. Error code is " << _code << std::endl;
@@ -806,14 +798,12 @@
if (do_reassign_) {
// Some other mapping change happened
- delete _pplan;
assignment_in_progress_ = true;
LOG(INFO) << "Doing assignment since physical assignment might have changed" << std::endl;
auto cb = [this](EventLoop::Status status) { this->DoPhysicalPlan(status); };
CHECK_GE(eventLoop_->registerTimer(std::move(cb), false, 0), 0);
} else {
bool first_time_pplan = current_pplan_ == nullptr;
- delete current_pplan_;
current_pplan_ = _pplan;
assignment_in_progress_ = false;
// We need to pass that on to all streammanagers
@@ -862,7 +852,7 @@
return std::move(response);
}
-proto::system::PhysicalPlan* TMaster::MakePhysicalPlan() {
+shared_ptr<proto::system::PhysicalPlan> TMaster::MakePhysicalPlan() {
// TODO(kramasamy): At some point, we need to talk to our scheduler
// and do this scheduling
if (current_pplan_) {
@@ -872,7 +862,7 @@
// First lets verify that our original pplan and instances
// all match up
CHECK(ValidateStMgrsWithPhysicalPlan(current_pplan_));
- proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+ auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->mutable_topology()->CopyFrom(current_pplan_->topology());
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
@@ -881,6 +871,7 @@
for (sp_int32 i = 0; i < current_pplan_->instances_size(); ++i) {
new_pplan->add_instances()->CopyFrom(current_pplan_->instances(i));
}
+
return new_pplan;
}
@@ -889,13 +880,14 @@
// TMaster just stitches the info together to pass to everyone
// Build the PhysicalPlan structure
- proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
+ auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->mutable_topology()->CopyFrom(*topology_);
// Build the physical assignments
for (StMgrMapIter stmgr_iter = stmgrs_.begin(); stmgr_iter != stmgrs_.end(); ++stmgr_iter) {
new_pplan->add_stmgrs()->CopyFrom(*(stmgr_iter->second->get_stmgr()));
- const std::vector<proto::system::Instance*>& instances = stmgr_iter->second->get_instances();
+ const std::vector<shared_ptr<proto::system::Instance>>& instances =
+ stmgr_iter->second->get_instances();
for (size_t i = 0; i < instances.size(); ++i) {
new_pplan->add_instances()->CopyFrom(*(instances[i]));
}
@@ -937,13 +929,12 @@
if (stmgrs_.find(stmgr_id) == stmgrs_.end()) {
return proto::system::INVALID_STMGR;
}
- StMgrState* stmgr = stmgrs_[stmgr_id];
+ auto stmgr = stmgrs_[stmgr_id];
// This guy disconnected from us
LOG(INFO) << "StMgr " << stmgr->get_stmgr()->id() << " disconnected from us" << std::endl;
stmgrs_.erase(stmgr->get_id());
connection_to_stmgr_id_.erase(_conn);
absent_stmgrs_.insert(stmgr->get_id());
- delete stmgr;
return proto::system::OK;
}
@@ -1000,7 +991,7 @@
return ninstances == ntasks;
}
-bool TMaster::ValidateStMgrsWithPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
+bool TMaster::ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan) {
std::map<std::string, std::vector<proto::system::Instance*> > stmgr_to_instance_map;
for (sp_int32 i = 0; i < _pplan->instances_size(); ++i) {
proto::system::Instance* instance = _pplan->mutable_instances(i);
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index 8869618..e407677 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -35,6 +35,9 @@
namespace heron {
namespace tmaster {
+using std::unique_ptr;
+using std::shared_ptr;
+
class StMgrState;
class TController;
class StatsInterface;
@@ -43,7 +46,7 @@
class StatefulController;
class CkptMgrClient;
-typedef std::map<std::string, StMgrState*> StMgrMap;
+typedef std::map<std::string, shared_ptr<StMgrState>> StMgrMap;
typedef StMgrMap::iterator StMgrMapIter;
typedef std::map<std::string, std::string> ConfigValueMap;
@@ -75,8 +78,8 @@
bool ValidateRuntimeConfig(const ComponentConfigMap& _config) const;
proto::system::Status* RegisterStMgr(const proto::system::StMgr& _stmgr,
- const std::vector<proto::system::Instance*>& _instances,
- Connection* _conn, proto::system::PhysicalPlan*& _pplan);
+ const std::vector<shared_ptr<proto::system::Instance>>& _instances,
+ Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan);
// function to update heartbeat for a nodemgr
proto::system::Status* UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
proto::system::StMgrStats* _stats);
@@ -93,12 +96,12 @@
std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse> GetStmgrsRegSummary();
// Accessors
- const proto::system::PhysicalPlan* getPhysicalPlan() const { return current_pplan_; }
+ const shared_ptr<proto::system::PhysicalPlan> getPhysicalPlan() const { return current_pplan_; }
// TODO(mfu): Should we provide this?
// topology_ should only be used to construct physical plan when TMaster first starts
// Providing an accessor is bug prone.
// Now used in GetMetrics function in tmetrics-collector
- const proto::api::Topology* getInitialTopology() const { return topology_; }
+ const proto::api::Topology& getInitialTopology() const { return *topology_; }
// Timer function to start the stateful checkpoint process
void SendCheckpointMarker();
@@ -131,7 +134,7 @@
// If _new_stmgr is null, this means that there was a plan
// existing, but a _new_stmgr joined us. So redo his part
// If _new_stmgr is empty, this means do pplan from scratch
- proto::system::PhysicalPlan* MakePhysicalPlan();
+ shared_ptr<proto::system::PhysicalPlan> MakePhysicalPlan();
// Check to see if the topology is of correct format
bool ValidateTopology(const proto::api::Topology& _topology);
@@ -142,7 +145,7 @@
// Check to see if the stmgrs and pplan match
// in terms of workers
- bool ValidateStMgrsWithPhysicalPlan(proto::system::PhysicalPlan* _pplan);
+ bool ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan);
// Check if incoming runtime configs are valid or not.
// All incoming configurations must exist. If there is any non-existing
@@ -159,26 +162,28 @@
void GetTopologyDone(proto::system::StatusCode _code);
// Function called after we get StatefulConsistentCheckpoints
- void GetStatefulCheckpointsDone(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt,
+ void GetStatefulCheckpointsDone(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
proto::system::StatusCode _code);
// Function called after we set an initial StatefulConsistentCheckpoints
void SetStatefulCheckpointsDone(proto::system::StatusCode _code,
- proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt);
+ shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt);
// Helper function to setup stateful coordinator
- void SetupStatefulController(proto::ckptmgr::StatefulConsistentCheckpoints* _ckpt);
+ void SetupStatefulController(shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt);
// Function called after we try to get assignment
- void GetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan, proto::system::StatusCode _code);
+ void GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
+ proto::system::StatusCode _code);
// Function called after we try to commit a new assignment
- void SetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan, proto::system::StatusCode _code);
+ void SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
+ proto::system::StatusCode _code);
// Function called when we want to setup ourselves as tmaster
void EstablishTMaster(EventLoop::Status);
void EstablishPackingPlan(EventLoop::Status);
void FetchPackingPlan();
- void OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
+ void OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
proto::system::StatusCode _status);
// Metrics updates
@@ -207,20 +212,20 @@
std::set<std::string> absent_stmgrs_;
// The current physical plan
- proto::system::PhysicalPlan* current_pplan_;
+ shared_ptr<proto::system::PhysicalPlan> current_pplan_;
// The topology as first submitted by the user
// It shall only be used to construct the physical plan when TMaster first time starts
// Any runtime changes shall be made to current_pplan_->topology
- proto::api::Topology* topology_;
+ unique_ptr<proto::api::Topology> topology_;
- proto::system::PackingPlan* packing_plan_;
+ shared_ptr<proto::system::PackingPlan> packing_plan_;
// The statemgr where we store/retrieve our state
- heron::common::HeronStateMgr* state_mgr_;
+ shared_ptr<heron::common::HeronStateMgr> state_mgr_;
// Our copy of the tmasterlocation
- proto::tmaster::TMasterLocation* tmaster_location_;
+ unique_ptr<proto::tmaster::TMasterLocation> tmaster_location_;
// When we are in the middle of doing assignment
// we set this to true
@@ -232,11 +237,11 @@
std::string topdir_;
// Servers that implement our services
- TController* tmaster_controller_;
+ unique_ptr<TController> tmaster_controller_;
sp_int32 tmaster_controller_port_;
- TMasterServer* master_;
+ unique_ptr<TMasterServer> master_;
sp_int32 master_port_;
- StatsInterface* stats_;
+ unique_ptr<StatsInterface> stats_;
sp_int32 stats_port_;
std::string myhost_name_;
@@ -245,24 +250,24 @@
sp_int32 master_establish_attempts_;
// collector
- TMetricsCollector* metrics_collector_;
+ shared_ptr<TMetricsCollector> metrics_collector_;
sp_int32 mMetricsMgrPort;
// Metrics Manager
- heron::common::MetricsMgrSt* mMetricsMgrClient;
+ shared_ptr<heron::common::MetricsMgrSt> mMetricsMgrClient;
// Ckpt Manager
- CkptMgrClient* ckptmgr_client_;
+ unique_ptr<CkptMgrClient> ckptmgr_client_;
sp_int32 ckptmgr_port_;
// Process related metrics
- heron::common::MultiAssignableMetric* tmasterProcessMetrics;
+ shared_ptr<heron::common::MultiAssignableMetric> tmasterProcessMetrics;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
// Stateful Controller
- StatefulController* stateful_controller_;
+ unique_ptr<StatefulController> stateful_controller_;
// HTTP client
AsyncDNS* dns_;
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.cpp b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
index be977d2..ea8dac8 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.cpp
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
@@ -32,8 +32,11 @@
namespace heron {
namespace tmaster {
+using std::unique_ptr;
+using std::shared_ptr;
+
TMasterServer::TMasterServer(EventLoop* eventLoop, const NetworkOptions& _options,
- TMetricsCollector* _collector, TMaster* _tmaster)
+ shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster)
: Server(eventLoop, _options), collector_(_collector), tmaster_(_tmaster) {
// Install the stmgr handlers
InstallRequestHandler(&TMasterServer::HandleStMgrRegisterRequest);
@@ -65,15 +68,15 @@
void TMasterServer::HandleStMgrRegisterRequest(REQID _reqid, Connection* _conn,
proto::tmaster::StMgrRegisterRequest* _request) {
- StMgrRegisterProcessor* processor =
- new StMgrRegisterProcessor(_reqid, _conn, _request, tmaster_, this);
+ unique_ptr<StMgrRegisterProcessor> processor =
+ make_unique<StMgrRegisterProcessor>(_reqid, _conn, _request, tmaster_, this);
processor->Start();
}
void TMasterServer::HandleStMgrHeartbeatRequest(REQID _reqid, Connection* _conn,
proto::tmaster::StMgrHeartbeatRequest* _request) {
- StMgrHeartbeatProcessor* processor =
- new StMgrHeartbeatProcessor(_reqid, _conn, _request, tmaster_, this);
+ unique_ptr<StMgrHeartbeatProcessor> processor =
+ make_unique<StMgrHeartbeatProcessor>(_reqid, _conn, _request, tmaster_, this);
processor->Start();
}
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.h b/heron/tmaster/src/cpp/manager/tmasterserver.h
index 297cc82..70306cf 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.h
+++ b/heron/tmaster/src/cpp/manager/tmasterserver.h
@@ -29,13 +29,15 @@
namespace heron {
namespace tmaster {
+using std::shared_ptr;
+
class TMaster;
class TMetricsCollector;
class TMasterServer : public Server {
public:
- TMasterServer(EventLoop* eventLoop, const NetworkOptions& options, TMetricsCollector* _collector,
- TMaster* _tmaster);
+ TMasterServer(EventLoop* eventLoop, const NetworkOptions& options,
+ shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster);
virtual ~TMasterServer();
protected:
@@ -62,7 +64,7 @@
proto::ckptmgr::ResetTopologyState* _message);
// our tmaster
- TMetricsCollector* collector_;
+ shared_ptr<TMetricsCollector> collector_;
TMaster* tmaster_;
};
} // namespace tmaster
diff --git a/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp b/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
index f8ac311..41e4a06 100644
--- a/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
+++ b/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
@@ -53,7 +53,7 @@
: max_interval_(_max_interval),
eventLoop_(eventLoop),
metrics_sinks_yaml_(metrics_sinks_yaml),
- tmetrics_info_(new common::TMasterMetrics(metrics_sinks_yaml, eventLoop)),
+ tmetrics_info_(make_unique<common::TMasterMetrics>(metrics_sinks_yaml, eventLoop)),
start_time_(time(NULL)) {
interval_ = config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterMetricsCollectorPurgeIntervalSec();
@@ -63,12 +63,7 @@
CHECK_GT(eventLoop_->registerTimer(std::move(cb), false, interval_ * 1000000), 0);
}
-TMetricsCollector::~TMetricsCollector() {
- for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
- delete iter->second;
- }
- delete tmetrics_info_;
-}
+TMetricsCollector::~TMetricsCollector() {}
void TMetricsCollector::Purge(EventLoop::Status) {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
@@ -81,7 +76,7 @@
void TMetricsCollector::AddMetricsForComponent(const sp_string& component_name,
const proto::tmaster::MetricDatum& metrics_data) {
- ComponentMetrics* component_metrics = GetOrCreateComponentMetrics(component_name);
+ auto component_metrics = GetOrCreateComponentMetrics(component_name);
const sp_string& name = metrics_data.name();
const TMasterMetrics::MetricAggregationType& type = tmetrics_info_->GetAggregationType(name);
component_metrics->AddMetricForInstance(metrics_data.instance_id(), name, type,
@@ -90,7 +85,7 @@
void TMetricsCollector::AddExceptionsForComponent(const sp_string& component_name,
const TmasterExceptionLog& exception_log) {
- ComponentMetrics* component_metrics = GetOrCreateComponentMetrics(component_name);
+ auto component_metrics = GetOrCreateComponentMetrics(component_name);
component_metrics->AddExceptionForInstance(exception_log.instance_id(), exception_log);
}
@@ -105,21 +100,21 @@
}
}
-MetricResponse* TMetricsCollector::GetMetrics(const MetricRequest& _request,
- const proto::api::Topology* _topology) {
- auto response = new MetricResponse();
+unique_ptr<MetricResponse> TMetricsCollector::GetMetrics(const MetricRequest& _request,
+ const proto::api::Topology& _topology) {
+ auto response = make_unique<MetricResponse>();
if (metrics_.find(_request.component_name()) == metrics_.end()) {
bool component_exists = false;
- for (int i = 0; i < _topology->spouts_size(); i++) {
- if ((_topology->spouts(i)).comp().name() == _request.component_name()) {
+ for (int i = 0; i < _topology.spouts_size(); i++) {
+ if ((_topology.spouts(i)).comp().name() == _request.component_name()) {
component_exists = true;
break;
}
}
if (!component_exists) {
- for (int i = 0; i < _topology->bolts_size(); i++) {
- if ((_topology->bolts(i)).comp().name() == _request.component_name()) {
+ for (int i = 0; i < _topology.bolts_size(); i++) {
+ if ((_topology.bolts(i)).comp().name() == _request.component_name()) {
component_exists = true;
break;
}
@@ -156,15 +151,15 @@
start_time = _request.explicit_interval().start();
end_time = _request.explicit_interval().end();
}
- metrics_[_request.component_name()]->GetMetrics(_request, start_time, end_time, response);
+ metrics_[_request.component_name()]->GetMetrics(_request, start_time, end_time, *response);
response->set_interval(end_time - start_time);
}
return response;
}
void TMetricsCollector::GetExceptionsHelper(const ExceptionLogRequest& request,
- ExceptionLogResponse* exceptions) {
- ComponentMetrics* component_metrics = metrics_[request.component_name()];
+ ExceptionLogResponse& exceptions) {
+ auto component_metrics = metrics_[request.component_name()];
if (request.instances_size() == 0) {
component_metrics->GetAllExceptions(exceptions);
} else {
@@ -174,8 +169,9 @@
}
}
-ExceptionLogResponse* TMetricsCollector::GetExceptions(const ExceptionLogRequest& request) {
- auto response = new ExceptionLogResponse();
+unique_ptr<ExceptionLogResponse> TMetricsCollector::GetExceptions(
+ const ExceptionLogRequest& request) {
+ auto response = make_unique<ExceptionLogResponse>();
if (metrics_.find(request.component_name()) == metrics_.end()) {
LOG(ERROR) << "GetExceptions request received for unknown component "
<< request.component_name();
@@ -185,12 +181,13 @@
}
response->mutable_status()->set_status(proto::system::OK);
response->mutable_status()->set_message("OK");
- GetExceptionsHelper(request, response);
+ GetExceptionsHelper(request, *response);
return response;
}
-ExceptionLogResponse* TMetricsCollector::GetExceptionsSummary(const ExceptionLogRequest& request) {
- auto response = new ExceptionLogResponse();
+unique_ptr<ExceptionLogResponse> TMetricsCollector::GetExceptionsSummary(
+ const ExceptionLogRequest& request) {
+ auto response = make_unique<ExceptionLogResponse>();
if (metrics_.find(request.component_name()) == metrics_.end()) {
LOG(ERROR) << "GetExceptionSummary request received for unknown component "
@@ -203,10 +200,9 @@
response->mutable_status()->set_message("OK");
// Owns this pointer.
- auto all_exceptions = new ExceptionLogResponse();
- GetExceptionsHelper(request, all_exceptions); // Store un aggregated exceptions.
- AggregateExceptions(*all_exceptions, response);
- delete all_exceptions;
+ auto all_exceptions = make_unique<ExceptionLogResponse>();
+ GetExceptionsHelper(request, *all_exceptions); // Store un aggregated exceptions.
+ AggregateExceptions(*all_exceptions, *response);
return response;
}
@@ -216,8 +212,11 @@
// into one function which take aggregation as argument. Modify the ExceptionRequest to
// take argument for which aggregation function to use)
void TMetricsCollector::AggregateExceptions(const ExceptionLogResponse& all_exceptions,
- ExceptionLogResponse* aggregate_exceptions) {
- std::map<std::string, TmasterExceptionLog*> exception_summary; // Owns exception log pointer.
+ ExceptionLogResponse& aggregate_exceptions) {
+ using std::map;
+ using std::string;
+
+ map<string, unique_ptr<TmasterExceptionLog>> exception_summary; // Owns exception log pointer.
for (int i = 0; i < all_exceptions.exceptions_size(); ++i) {
const TmasterExceptionLog& log = all_exceptions.exceptions(i);
// Get classname by splitting on first colon
@@ -226,30 +225,31 @@
if (pos != std::string::npos) {
const std::string class_name = stack_trace.substr(0, pos);
if (exception_summary.find(class_name) == exception_summary.end()) {
- auto new_exception = new TmasterExceptionLog();
+ auto new_exception = make_unique<TmasterExceptionLog>();
new_exception->CopyFrom(log);
new_exception->set_stacktrace(class_name);
- exception_summary[class_name] = new_exception;
+ exception_summary[class_name] = std::move(new_exception);
} else {
- TmasterExceptionLog* prev_log = exception_summary[class_name];
- prev_log->set_count(log.count() + prev_log->count());
- prev_log->set_lasttime(log.lasttime());
+ TmasterExceptionLog& prev_log = *exception_summary[class_name];
+ prev_log.set_count(log.count() + prev_log.count());
+ prev_log.set_lasttime(log.lasttime());
}
}
}
for (auto summary_iter = exception_summary.begin();
summary_iter != exception_summary.end(); ++summary_iter) {
- aggregate_exceptions->add_exceptions()->CopyFrom(*(summary_iter->second));
- delete summary_iter->second; // Remove the temporary object holding exception summary
+ aggregate_exceptions.add_exceptions()->CopyFrom(*(summary_iter->second));
}
}
-TMetricsCollector::ComponentMetrics* TMetricsCollector::GetOrCreateComponentMetrics(
+shared_ptr<TMetricsCollector::ComponentMetrics> TMetricsCollector::GetOrCreateComponentMetrics(
const sp_string& component_name) {
if (metrics_.find(component_name) == metrics_.end()) {
- metrics_[component_name] = new ComponentMetrics(component_name, nintervals_, interval_);
+ metrics_[component_name] =
+ std::make_shared<ComponentMetrics>(component_name, nintervals_, interval_);
}
+
return metrics_[component_name];
}
@@ -257,11 +257,7 @@
sp_int32 nbuckets, sp_int32 bucket_interval)
: component_name_(component_name), nbuckets_(nbuckets), bucket_interval_(bucket_interval) {}
-TMetricsCollector::ComponentMetrics::~ComponentMetrics() {
- for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
- delete iter->second;
- }
-}
+TMetricsCollector::ComponentMetrics::~ComponentMetrics() {}
void TMetricsCollector::ComponentMetrics::Purge() {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
@@ -272,32 +268,34 @@
void TMetricsCollector::ComponentMetrics::AddMetricForInstance(
const sp_string& instance_id, const sp_string& name, TMasterMetrics::MetricAggregationType type,
const sp_string& value) {
- InstanceMetrics* instance_metrics = GetOrCreateInstanceMetrics(instance_id);
+ auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
instance_metrics->AddMetricWithName(name, type, value);
}
void TMetricsCollector::ComponentMetrics::AddExceptionForInstance(
const sp_string& instance_id, const TmasterExceptionLog& exception) {
- InstanceMetrics* instance_metrics = GetOrCreateInstanceMetrics(instance_id);
+ auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
instance_metrics->AddExceptions(exception);
}
-TMetricsCollector::InstanceMetrics* TMetricsCollector::ComponentMetrics::GetOrCreateInstanceMetrics(
- const sp_string& instance_id) {
+shared_ptr<TMetricsCollector::InstanceMetrics>
+ TMetricsCollector::ComponentMetrics::GetOrCreateInstanceMetrics(const sp_string& instance_id) {
if (metrics_.find(instance_id) == metrics_.end()) {
- metrics_[instance_id] = new InstanceMetrics(instance_id, nbuckets_, bucket_interval_);
+ metrics_[instance_id] =
+ std::make_shared<InstanceMetrics>(instance_id, nbuckets_, bucket_interval_);
}
+
return metrics_[instance_id];
}
void TMetricsCollector::ComponentMetrics::GetMetrics(const MetricRequest& _request,
sp_int64 start_time, sp_int64 end_time,
- MetricResponse* _response) {
+ MetricResponse& _response) {
if (_request.instance_id_size() == 0) {
// This means that all instances need to be returned
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->GetMetrics(_request, start_time, end_time, _response);
- if (_response->status().status() != proto::system::OK) {
+ if (_response.status().status() != proto::system::OK) {
return;
}
}
@@ -306,27 +304,27 @@
const sp_string& id = _request.instance_id(i);
if (metrics_.find(id) == metrics_.end()) {
LOG(ERROR) << "GetMetrics request received for unknown instance_id " << id;
- _response->mutable_status()->set_status(proto::system::NOTOK);
+ _response.mutable_status()->set_status(proto::system::NOTOK);
return;
} else {
metrics_[id]->GetMetrics(_request, start_time, end_time, _response);
- if (_response->status().status() != proto::system::OK) {
+ if (_response.status().status() != proto::system::OK) {
return;
}
}
}
}
- _response->mutable_status()->set_status(proto::system::OK);
+ _response.mutable_status()->set_status(proto::system::OK);
}
void TMetricsCollector::ComponentMetrics::GetExceptionsForInstance(const sp_string& instance_id,
- ExceptionLogResponse* response) {
+ ExceptionLogResponse& response) {
if (metrics_.find(instance_id) != metrics_.end()) {
metrics_[instance_id]->GetExceptionLog(response);
}
}
-void TMetricsCollector::ComponentMetrics::GetAllExceptions(ExceptionLogResponse* response) {
+void TMetricsCollector::ComponentMetrics::GetAllExceptions(ExceptionLogResponse& response) {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
iter->second->GetExceptionLog(response);
}
@@ -336,14 +334,7 @@
sp_int32 bucket_interval)
: instance_id_(instance_id), nbuckets_(nbuckets), bucket_interval_(bucket_interval) {}
-TMetricsCollector::InstanceMetrics::~InstanceMetrics() {
- for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
- delete iter->second;
- }
- for (auto ex_iter = exceptions_.begin(); ex_iter != exceptions_.end(); ++ex_iter) {
- delete *ex_iter;
- }
-}
+TMetricsCollector::InstanceMetrics::~InstanceMetrics() {}
void TMetricsCollector::InstanceMetrics::Purge() {
for (auto iter = metrics_.begin(); iter != metrics_.end(); ++iter) {
@@ -354,7 +345,7 @@
void TMetricsCollector::InstanceMetrics::AddMetricWithName(
const sp_string& name, common::TMasterMetrics::MetricAggregationType type,
const sp_string& value) {
- Metric* metric_data = GetOrCreateMetric(name, type);
+ auto metric_data = GetOrCreateMetric(name, type);
metric_data->AddValueToMetric(value);
}
@@ -363,30 +354,28 @@
// TODO(kramasamy): Aggregate exceptions across minutely buckets. Try to avoid duplication of
// hash-fuction
// used to aggregate in heron-worker.
- auto new_exception = new TmasterExceptionLog();
+ auto new_exception = make_unique<TmasterExceptionLog>();
new_exception->CopyFrom(exception);
- exceptions_.push_back(new_exception);
+ exceptions_.push_back(std::move(new_exception));
sp_uint32 max_exception = config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterMetricsCollectorMaximumException();
while (exceptions_.size() > max_exception) {
- TmasterExceptionLog* e = exceptions_.front();
exceptions_.pop_front();
- delete e;
}
}
-TMetricsCollector::Metric* TMetricsCollector::InstanceMetrics::GetOrCreateMetric(
+shared_ptr<TMetricsCollector::Metric> TMetricsCollector::InstanceMetrics::GetOrCreateMetric(
const sp_string& name, TMasterMetrics::MetricAggregationType type) {
if (metrics_.find(name) == metrics_.end()) {
- metrics_[name] = new Metric(name, type, nbuckets_, bucket_interval_);
+ metrics_[name] = std::make_shared<Metric>(name, type, nbuckets_, bucket_interval_);
}
return metrics_[name];
}
void TMetricsCollector::InstanceMetrics::GetMetrics(const MetricRequest& request,
sp_int64 start_time, sp_int64 end_time,
- MetricResponse* response) {
- MetricResponse::TaskMetric* m = response->add_metric();
+ MetricResponse& response) {
+ MetricResponse::TaskMetric* m = response.add_metric();
m->set_instance_id(instance_id_);
for (sp_int32 i = 0; i < request.metric_size(); ++i) {
const sp_string& id = request.metric(i);
@@ -396,9 +385,9 @@
}
}
-void TMetricsCollector::InstanceMetrics::GetExceptionLog(ExceptionLogResponse* response) {
+void TMetricsCollector::InstanceMetrics::GetExceptionLog(ExceptionLogResponse& response) {
for (auto ex_iter = exceptions_.begin(); ex_iter != exceptions_.end(); ++ex_iter) {
- response->add_exceptions()->CopyFrom(*(*ex_iter));
+ response.add_exceptions()->CopyFrom(*(*ex_iter));
}
}
@@ -411,21 +400,15 @@
all_time_nitems_(0),
bucket_interval_(bucket_interval) {
for (sp_int32 i = 0; i < nbuckets; ++i) {
- data_.push_back(new TimeBucket(bucket_interval_));
+ data_.push_back(std::move(make_unique<TimeBucket>(bucket_interval_)));
}
}
-TMetricsCollector::Metric::~Metric() {
- for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
- delete *iter;
- }
-}
+TMetricsCollector::Metric::~Metric() {}
void TMetricsCollector::Metric::Purge() {
- TimeBucket* b = data_.back();
data_.pop_back();
- data_.push_front(new TimeBucket(bucket_interval_));
- delete b;
+ data_.push_front(std::move(make_unique<TimeBucket>(bucket_interval_)));
}
void TMetricsCollector::Metric::AddValueToMetric(const sp_string& _value) {
@@ -449,17 +432,17 @@
if (minutely) {
// we need minutely data
for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
- TimeBucket* bucket = *iter;
+ TimeBucket& bucket = **iter;
// Does this time bucket have overlap with needed range
- if (bucket->overlaps(start_time, end_time)) {
+ if (bucket.overlaps(start_time, end_time)) {
IntervalValue* val = _response->add_interval_values();
- val->mutable_interval()->set_start(bucket->start_time_);
- val->mutable_interval()->set_end(bucket->end_time_);
- sp_double64 result = bucket->aggregate();
+ val->mutable_interval()->set_start(bucket.start_time_);
+ val->mutable_interval()->set_end(bucket.end_time_);
+ sp_double64 result = bucket.aggregate();
if (metric_type_ == common::TMasterMetrics::SUM) {
val->set_value(std::to_string(result));
} else if (metric_type_ == common::TMasterMetrics::AVG) {
- sp_double64 avg = result / bucket->count();
+ sp_double64 avg = result / bucket.count();
val->set_value(std::to_string(avg));
} else if (metric_type_ == common::TMasterMetrics::LAST) {
val->set_value(std::to_string(result));
@@ -468,7 +451,7 @@
}
}
// The timebuckets are reverse chronologically arranged
- if (start_time > bucket->end_time_) break;
+ if (start_time > bucket.end_time_) break;
}
} else {
// We don't need minutely data
@@ -489,15 +472,15 @@
sp_int64 total_items = 0;
sp_double64 total_count = 0;
for (auto iter = data_.begin(); iter != data_.end(); ++iter) {
- TimeBucket* bucket = *iter;
+ TimeBucket& bucket = **iter;
// Does this time bucket have overlap with needed range
- if (bucket->overlaps(start_time, end_time)) {
- total_count += bucket->aggregate();
- total_items += bucket->count();
+ if (bucket.overlaps(start_time, end_time)) {
+ total_count += bucket.aggregate();
+ total_items += bucket.count();
if (metric_type_ == TMasterMetrics::LAST) break;
}
// The timebuckets are reverse chronologically arranged
- if (start_time > bucket->end_time_) break;
+ if (start_time > bucket.end_time_) break;
}
if (metric_type_ == TMasterMetrics::SUM) {
result = total_count;
diff --git a/heron/tmaster/src/cpp/manager/tmetrics-collector.h b/heron/tmaster/src/cpp/manager/tmetrics-collector.h
index b898b94..8374590 100644
--- a/heron/tmaster/src/cpp/manager/tmetrics-collector.h
+++ b/heron/tmaster/src/cpp/manager/tmetrics-collector.h
@@ -32,6 +32,10 @@
namespace heron {
namespace tmaster {
+
+using std::unique_ptr;
+using std::shared_ptr;
+
// Helper class to manage aggregation and and serving of metrics. Metrics are logically stored as a
// component_name -> {instance_id ->value}n .
// TODO(kramasamy): Store metrics persistently to prevent against crashes.
@@ -50,15 +54,16 @@
// Returns a new response to fetch metrics. The request gets propagated to Component's and
// Instance's get metrics. Doesn't own Response.
- proto::tmaster::MetricResponse* GetMetrics(const proto::tmaster::MetricRequest& _request,
- const proto::api::Topology* _topology);
+ unique_ptr<proto::tmaster::MetricResponse> GetMetrics(
+ const proto::tmaster::MetricRequest& _request,
+ const proto::api::Topology& _topology);
// Returns response for fetching exceptions. Doesn't own response.
- proto::tmaster::ExceptionLogResponse* GetExceptions(
+ unique_ptr<proto::tmaster::ExceptionLogResponse> GetExceptions(
const proto::tmaster::ExceptionLogRequest& request);
// Returns exception summary response. Doesn't own response.
- proto::tmaster::ExceptionLogResponse* GetExceptionsSummary(
+ unique_ptr<proto::tmaster::ExceptionLogResponse> GetExceptionsSummary(
const proto::tmaster::ExceptionLogRequest& request);
private:
@@ -66,12 +71,12 @@
// 'all_exceptions'.
// Doesn't own 'all_exceptions' pointer
void GetExceptionsHelper(const proto::tmaster::ExceptionLogRequest& request,
- proto::tmaster::ExceptionLogResponse* all_excepions);
+ proto::tmaster::ExceptionLogResponse& all_excepions);
// Aggregate exceptions from 'all_exceptions' to 'aggregate_exceptions'.
// Doesn't own 'aggregate_exceptions'.
void AggregateExceptions(const proto::tmaster::ExceptionLogResponse& all_exceptions,
- proto::tmaster::ExceptionLogResponse* aggregate_exceptions);
+ proto::tmaster::ExceptionLogResponse& aggregate_exceptions);
// Add metrics for 'component_name'
void AddMetricsForComponent(const sp_string& component_name,
@@ -139,7 +144,7 @@
private:
sp_string name_;
// Time series. data_ will be ordered by their time of arrival.
- std::list<TimeBucket*> data_;
+ std::list<unique_ptr<TimeBucket>> data_;
// Type of metric. This can be SUM or AVG. It specify how to aggregate these metrics for
// display.
common::TMasterMetrics::MetricAggregationType metric_type_;
@@ -174,23 +179,23 @@
// Returns the metric metrics. Doesn't own _response.
void GetMetrics(const proto::tmaster::MetricRequest& request, sp_int64 start_time,
- sp_int64 end_time, proto::tmaster::MetricResponse* response);
+ sp_int64 end_time, proto::tmaster::MetricResponse& response);
// Fills response for fetching exceptions. Doesn't own response.
- void GetExceptionLog(proto::tmaster::ExceptionLogResponse* response);
+ void GetExceptionLog(proto::tmaster::ExceptionLogResponse& response);
private:
// Create or return existing Metric. Retains ownership of Metric object returned.
- Metric* GetOrCreateMetric(const sp_string& name,
+ shared_ptr<Metric> GetOrCreateMetric(const sp_string& name,
common::TMasterMetrics::MetricAggregationType type);
sp_string instance_id_;
sp_int32 nbuckets_;
sp_int32 bucket_interval_;
// map between metric name and its values
- std::map<sp_string, Metric*> metrics_;
+ std::map<sp_string, shared_ptr<Metric>> metrics_;
// list of exceptions
- std::list<proto::tmaster::TmasterExceptionLog*> exceptions_;
+ std::list<unique_ptr<proto::tmaster::TmasterExceptionLog>> exceptions_;
};
// Component level metrics. A component metrics is a map storing metrics for each of its
@@ -217,39 +222,39 @@
// Request aggregated metrics for this component for the '_nbucket' interval.
// Doesn't own '_response' object.
void GetMetrics(const proto::tmaster::MetricRequest& request, sp_int64 start_time,
- sp_int64 end_time, proto::tmaster::MetricResponse* response);
+ sp_int64 end_time, proto::tmaster::MetricResponse& response);
// Returns response for fetching exceptions. Doesn't own response.
void GetExceptionsForInstance(const sp_string& instance_id,
- proto::tmaster::ExceptionLogResponse* response);
+ proto::tmaster::ExceptionLogResponse& response);
- void GetAllExceptions(proto::tmaster::ExceptionLogResponse* response);
+ void GetAllExceptions(proto::tmaster::ExceptionLogResponse& response);
private:
// Create or return existing mutable InstanceMetrics associated with 'instance_id'. This
// method doesn't verify if the instance_id is valid fof the component.
// Doesn't transfer ownership of returned InstanceMetrics.
- InstanceMetrics* GetOrCreateInstanceMetrics(const sp_string& instance_id);
+ shared_ptr<InstanceMetrics> GetOrCreateInstanceMetrics(const sp_string& instance_id);
sp_string component_name_;
sp_int32 nbuckets_;
sp_int32 bucket_interval_;
// map between instance id and its set of metrics
- std::map<sp_string, InstanceMetrics*> metrics_;
+ std::map<sp_string, shared_ptr<InstanceMetrics>> metrics_;
};
// Create or return existing mutable ComponentMetrics associated with 'component_name'.
// Doesn't transfer ownership of returned ComponentMetrics
- ComponentMetrics* GetOrCreateComponentMetrics(const sp_string& component_name);
+ shared_ptr<ComponentMetrics> GetOrCreateComponentMetrics(const sp_string& component_name);
// map of component name to its metrics
- std::map<sp_string, ComponentMetrics*> metrics_;
+ std::map<sp_string, shared_ptr<ComponentMetrics>> metrics_;
sp_int32 max_interval_;
sp_int32 nintervals_;
sp_int32 interval_;
EventLoop* eventLoop_;
std::string metrics_sinks_yaml_;
- common::TMasterMetrics* tmetrics_info_;
+ std::unique_ptr<common::TMasterMetrics> tmetrics_info_;
time_t start_time_;
};
} // namespace tmaster
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
index 54807bc..47bd7b1 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
@@ -49,7 +49,6 @@
proto::tmaster::StMgrHeartbeatResponse response;
response.set_allocated_status(status);
SendResponse(response);
- delete this;
}
} // namespace tmaster
} // namespace heron
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
index e963ca5..cbdd583 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
+++ b/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
@@ -45,13 +45,14 @@
// Get the relevant info and ask tmaster to register
proto::tmaster::StMgrRegisterRequest* request =
static_cast<proto::tmaster::StMgrRegisterRequest*>(request_);
- std::vector<proto::system::Instance*> instances;
+ std::vector<shared_ptr<proto::system::Instance>> instances;
for (sp_int32 i = 0; i < request->instances_size(); ++i) {
- proto::system::Instance* instance = new proto::system::Instance();
+ auto instance = std::make_shared<proto::system::Instance>();
instance->CopyFrom(request->instances(i));
instances.push_back(instance);
}
- proto::system::PhysicalPlan* pplan = NULL;
+
+ shared_ptr<proto::system::PhysicalPlan> pplan;
proto::system::Status* status =
tmaster_->RegisterStMgr(request->stmgr(), instances, GetConnection(), pplan);
@@ -65,7 +66,6 @@
}
}
SendResponse(response);
- delete this;
return;
}
} // namespace tmaster
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.cpp b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
index af7de12..c8dead6 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.cpp
+++ b/heron/tmaster/tests/cpp/server/dummystmgr.cpp
@@ -42,9 +42,9 @@
pplan_(nullptr),
got_restore_message_(false),
got_start_message_(false) {
- InstallResponseHandler(new proto::tmaster::StMgrRegisterRequest(),
+ InstallResponseHandler(std::move(make_unique<proto::tmaster::StMgrRegisterRequest>()),
&DummyStMgr::HandleRegisterResponse);
- InstallResponseHandler(new proto::tmaster::StMgrHeartbeatRequest(),
+ InstallResponseHandler(std::move(make_unique<proto::tmaster::StMgrHeartbeatRequest>()),
&DummyStMgr::HandleHeartbeatResponse);
InstallMessageHandler(&DummyStMgr::HandleNewAssignmentMessage);
InstallMessageHandler(&DummyStMgr::HandleRestoreTopologyStateRequest);
@@ -139,7 +139,7 @@
}
void DummyStMgr::SendRegisterRequest() {
- proto::tmaster::StMgrRegisterRequest* request = new proto::tmaster::StMgrRegisterRequest();
+ auto request = make_unique<proto::tmaster::StMgrRegisterRequest>();
proto::system::StMgr* stmgr = request->mutable_stmgr();
stmgr->set_id(my_id_);
stmgr->set_host_name(my_host_);
@@ -149,15 +149,15 @@
iter != instances_.end(); ++iter) {
request->add_instances()->CopyFrom(**iter);
}
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
return;
}
void DummyStMgr::SendHeartbeatRequest() {
- proto::tmaster::StMgrHeartbeatRequest* request = new proto::tmaster::StMgrHeartbeatRequest();
+ auto request = make_unique<proto::tmaster::StMgrHeartbeatRequest>();
request->set_heartbeat_time(time(NULL));
request->mutable_stats();
- SendRequest(request, NULL);
+ SendRequest(std::move(request), NULL);
return;
}
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.cpp b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
index 08d0456..a244b32 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.cpp
+++ b/heron/tmaster/tests/cpp/server/dummytmaster.cpp
@@ -49,9 +49,9 @@
void DummyTMaster::HandleRegisterRequest(REQID _id, Connection* _conn,
proto::tmaster::StMgrRegisterRequest* _request) {
- std::vector<proto::system::Instance*> instances;
+ std::vector<std::shared_ptr<proto::system::Instance>> instances;
stmgrs_[_request->stmgr().id()] =
- new tmaster::StMgrState(_conn, _request->stmgr(), instances, this);
+ std::make_shared<tmaster::StMgrState>(_conn, _request->stmgr(), instances, *this);
delete _request;
proto::tmaster::StMgrRegisterResponse response;
response.mutable_status()->set_status(proto::system::OK);
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index 32550a0..775cc36 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -780,7 +780,7 @@
EXPECT_EQ(updated_bolt_config[bolt_init_config + ":runtime"], "4");
}
std::map<std::string, std::string> updated_config, updated_spout_config, updated_bolt_config;
- const heron::proto::system::PhysicalPlan* pplan = common.tmaster_->getPhysicalPlan();
+ const auto pplan = common.tmaster_->getPhysicalPlan();
heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(pplan->topology(), updated_config);
EXPECT_EQ(updated_config[topology_init_config_1 + ":runtime"], "1");
EXPECT_EQ(updated_config[topology_init_config_2 + ":runtime"], "2");