renaming "topology master" to "topology manager" in heron/tmaster
diff --git a/heron/tmaster/src/cpp/BUILD b/heron/tmanager/src/cpp/BUILD
similarity index 83%
rename from heron/tmaster/src/cpp/BUILD
rename to heron/tmanager/src/cpp/BUILD
index ee004e5..eb01318 100644
--- a/heron/tmaster/src/cpp/BUILD
+++ b/heron/tmanager/src/cpp/BUILD
@@ -3,7 +3,7 @@
package(default_visibility = ["//visibility:public"])
cc_library(
- name = "tmaster-cxx",
+ name = "tmanager-cxx",
srcs = [
"manager/ckptmgr-client.cpp",
"manager/ckptmgr-client.h",
@@ -19,27 +19,27 @@
"manager/stmgrstate.h",
"manager/tcontroller.cpp",
"manager/tcontroller.h",
- "manager/tmaster.cpp",
- "manager/tmasterserver.cpp",
- "manager/tmasterserver.h",
+ "manager/tmanager.cpp",
+ "manager/tmanagerserver.cpp",
+ "manager/tmanagerserver.h",
"manager/tmetrics-collector.cpp",
"manager/tmetrics-collector.h",
"processor/stmgr-heartbeat-processor.cpp",
"processor/stmgr-heartbeat-processor.h",
"processor/stmgr-register-processor.cpp",
"processor/stmgr-register-processor.h",
- "processor/tmaster-processor.cpp",
- "processor/tmaster-processor.h",
+ "processor/tmanager-processor.cpp",
+ "processor/tmanager-processor.h",
],
hdrs = [
- "manager/tmaster.h",
+ "manager/tmanager.h",
"processor/processor.h",
],
copts = [
"-Iheron",
"-Iheron/common/src/cpp",
"-Iheron/statemgrs/src/cpp",
- "-Iheron/tmaster/src/cpp",
+ "-Iheron/tmanager/src/cpp",
"-I$(GENDIR)/heron",
"-I$(GENDIR)/heron/common/src/cpp",
],
@@ -56,21 +56,21 @@
)
cc_binary(
- name = "heron-tmaster",
+ name = "heron-tmanager",
srcs = [
- "server/tmaster-main.cpp",
+ "server/tmanager-main.cpp",
],
copts = [
"-Iheron",
"-Iheron/common/src/cpp",
"-Iheron/statemgrs/src/cpp",
- "-Iheron/tmaster/src/cpp",
+ "-Iheron/tmanager/src/cpp",
"-I$(GENDIR)/heron",
"-I$(GENDIR)/heron/common/src/cpp",
],
linkstatic = 1,
deps = [
- ":tmaster-cxx",
+ ":tmanager-cxx",
"//config:config-cxx",
"//heron/common/src/cpp/config:config-cxx",
"//heron/common/src/cpp/metrics:metrics-cxx",
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp b/heron/tmanager/src/cpp/manager/ckptmgr-client.cpp
similarity index 93%
rename from heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
rename to heron/tmanager/src/cpp/manager/ckptmgr-client.cpp
index eb4ac40..212567d 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.cpp
+++ b/heron/tmanager/src/cpp/manager/ckptmgr-client.cpp
@@ -24,7 +24,7 @@
#include "threads/threads.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
CkptMgrClient::CkptMgrClient(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
@@ -40,8 +40,8 @@
- InstallResponseHandler(make_unique<proto::ckptmgr::RegisterTMasterRequest>(),
- &CkptMgrClient::HandleTMasterRegisterResponse);
+ InstallResponseHandler(make_unique<proto::ckptmgr::RegisterTManagerRequest>(),
+ &CkptMgrClient::HandleTManagerRegisterResponse);
InstallResponseHandler(make_unique<proto::ckptmgr::CleanStatefulCheckpointRequest>(),
&CkptMgrClient::HandleCleanStatefulCheckpointResponse);
}
@@ -98,9 +98,9 @@
}
}
-void CkptMgrClient::HandleTMasterRegisterResponse(
+void CkptMgrClient::HandleTManagerRegisterResponse(
void*,
- pool_unique_ptr<proto::ckptmgr::RegisterTMasterResponse> _response,
+ pool_unique_ptr<proto::ckptmgr::RegisterTManagerResponse> _response,
NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK network code" << _status << " for register response from ckptmgr "
@@ -127,8 +127,8 @@
void CkptMgrClient::OnReconnectTimer() { Start(); }
void CkptMgrClient::SendRegisterRequest() {
- LOG(INFO) << "Sending RegisterTmasterRequest to ckptmgr" << std::endl;
- auto request = make_unique<proto::ckptmgr::RegisterTMasterRequest>();
+ LOG(INFO) << "Sending RegisterTmanagerRequest to ckptmgr" << std::endl;
+ auto request = make_unique<proto::ckptmgr::RegisterTManagerRequest>();
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
SendRequest(std::move(request), nullptr);
@@ -161,5 +161,5 @@
clean_response_watcher_(code);
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/ckptmgr-client.h b/heron/tmanager/src/cpp/manager/ckptmgr-client.h
similarity index 88%
rename from heron/tmaster/src/cpp/manager/ckptmgr-client.h
rename to heron/tmanager/src/cpp/manager/ckptmgr-client.h
index 8c187aa..5c2ea70 100644
--- a/heron/tmaster/src/cpp/manager/ckptmgr-client.h
+++ b/heron/tmanager/src/cpp/manager/ckptmgr-client.h
@@ -17,8 +17,8 @@
* under the License.
*/
-#ifndef SRC_CPP_SVCS_TMASTER_SRC_CKPTMGR_CLIENT_H
-#define SRC_CPP_SVCS_TMASTER_SRC_CKPTMGR_CLIENT_H
+#ifndef SRC_CPP_SVCS_TMANAGER_SRC_CKPTMGR_CLIENT_H
+#define SRC_CPP_SVCS_TMANAGER_SRC_CKPTMGR_CLIENT_H
#include <string>
#include "basics/basics.h"
@@ -27,7 +27,7 @@
#include "proto/messages.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
class CkptMgrClient : public Client {
public:
@@ -49,9 +49,9 @@
virtual void HandleClose(NetworkErrorCode status);
private:
- void HandleTMasterRegisterResponse(
+ void HandleTManagerRegisterResponse(
void*,
- pool_unique_ptr<proto::ckptmgr::RegisterTMasterResponse>_response,
+ pool_unique_ptr<proto::ckptmgr::RegisterTManagerResponse>_response,
NetworkErrorCode _status);
void SendRegisterRequest();
@@ -67,7 +67,7 @@
sp_int32 reconnect_ckptmgr_interval_sec_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
-#endif // SRC_CPP_SVCS_TMASTER_SRC_CKPTMGR_CLIENT_H
+#endif // SRC_CPP_SVCS_TMANAGER_SRC_CKPTMGR_CLIENT_H
diff --git a/heron/tmaster/src/cpp/manager/stateful-checkpointer.cpp b/heron/tmanager/src/cpp/manager/stateful-checkpointer.cpp
similarity index 93%
rename from heron/tmaster/src/cpp/manager/stateful-checkpointer.cpp
rename to heron/tmanager/src/cpp/manager/stateful-checkpointer.cpp
index fc4d4e6..0506085 100644
--- a/heron/tmaster/src/cpp/manager/stateful-checkpointer.cpp
+++ b/heron/tmanager/src/cpp/manager/stateful-checkpointer.cpp
@@ -23,16 +23,16 @@
#include <chrono>
#include <string>
#include "config/physical-plan-helper.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "manager/stmgrstate.h"
#include "errors/errors.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
StatefulCheckpointer::StatefulCheckpointer(
- std::chrono::high_resolution_clock::time_point _tmaster_start_time)
- : tmaster_start_time_(_tmaster_start_time) {
+ std::chrono::high_resolution_clock::time_point _tmanager_start_time)
+ : tmanager_start_time_(_tmanager_start_time) {
// do nothing
}
@@ -41,7 +41,7 @@
sp_string StatefulCheckpointer::GenerateCheckpointId() {
// TODO(skukarni) Should we append any topology name/id stuff?
std::ostringstream tag;
- tag << tmaster_start_time_.time_since_epoch().count()
+ tag << tmanager_start_time_.time_since_epoch().count()
<< "-" << time(NULL);
return tag.str();
}
@@ -97,5 +97,5 @@
return false;
}
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stateful-checkpointer.h b/heron/tmanager/src/cpp/manager/stateful-checkpointer.h
similarity index 85%
rename from heron/tmaster/src/cpp/manager/stateful-checkpointer.h
rename to heron/tmanager/src/cpp/manager/stateful-checkpointer.h
index 5ec708d..dd6e3a1 100644
--- a/heron/tmaster/src/cpp/manager/stateful-checkpointer.h
+++ b/heron/tmanager/src/cpp/manager/stateful-checkpointer.h
@@ -17,14 +17,14 @@
* under the License.
*/
-#ifndef __TMASTER_STATEFUL_CHECKPOINTER_H_
-#define __TMASTER_STATEFUL_CHECKPOINTER_H_
+#ifndef __TMANAGER_STATEFUL_CHECKPOINTER_H_
+#define __TMANAGER_STATEFUL_CHECKPOINTER_H_
#include <string>
#include <unordered_set>
#include "network/network.h"
-#include "proto/tmaster.pb.h"
-#include "manager/tmaster.h"
+#include "proto/tmanager.pb.h"
+#include "manager/tmanager.h"
#include "basics/basics.h"
@@ -35,7 +35,7 @@
}
namespace heron {
-namespace tmaster {
+namespace tmanager {
/**
* A StatefulCheckpointer is responsible for sending NewStatefulCheckpoint
@@ -45,13 +45,13 @@
*/
class StatefulCheckpointer {
public:
- explicit StatefulCheckpointer(std::chrono::high_resolution_clock::time_point _tmaster_start_time);
+ explicit StatefulCheckpointer(std::chrono::high_resolution_clock::time_point _tmanager_start_time);
virtual ~StatefulCheckpointer();
void RegisterNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan);
void StartCheckpoint(const StMgrMap& _stmgrs);
- // Called by tmaster when a InstanceStateStored message is received
+ // Called by tmanager when a InstanceStateStored message is received
// Return true if this completes a globally consistent checkpoint
// for this _checkpoint_id
bool HandleInstanceStateStored(const std::string& _checkpoint_id,
@@ -60,7 +60,7 @@
private:
sp_string GenerateCheckpointId();
- std::chrono::high_resolution_clock::time_point tmaster_start_time_;
+ std::chrono::high_resolution_clock::time_point tmanager_start_time_;
// Current partially consistent checkpoint
// for which still some more states need to be saved
@@ -74,7 +74,7 @@
// consistent checkpoint
std::unordered_set<sp_int32> partial_checkpoint_remaining_tasks_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp b/heron/tmanager/src/cpp/manager/stateful-controller.cpp
similarity index 98%
rename from heron/tmaster/src/cpp/manager/stateful-controller.cpp
rename to heron/tmanager/src/cpp/manager/stateful-controller.cpp
index a348c10..6db3acb 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp
+++ b/heron/tmanager/src/cpp/manager/stateful-controller.cpp
@@ -25,14 +25,14 @@
#include <vector>
#include "manager/stateful-checkpointer.h"
#include "manager/stateful-restorer.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "manager/stmgrstate.h"
#include "metrics/metrics.h"
#include "basics/basics.h"
#include "errors/errors.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::make_shared;
@@ -53,7 +53,7 @@
StatefulController::StatefulController(const std::string& _topology_name,
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
shared_ptr<heron::common::HeronStateMgr> _state_mgr,
- std::chrono::high_resolution_clock::time_point _tmaster_start_time,
+ std::chrono::high_resolution_clock::time_point _tmanager_start_time,
shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)>
_ckpt_save_watcher)
@@ -62,7 +62,7 @@
state_mgr_(_state_mgr),
metrics_manager_client_(_metrics_manager_client),
ckpt_save_watcher_(_ckpt_save_watcher) {
- checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time);
+ checkpointer_ = make_unique<StatefulCheckpointer>(_tmanager_start_time);
restorer_ = make_unique<StatefulRestorer>();
count_metrics_ = make_shared<common::MultiCountMetric>();
@@ -224,5 +224,5 @@
bool StatefulController::RestoreInProgress() const {
return restorer_->IsInProgress();
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h b/heron/tmanager/src/cpp/manager/stateful-controller.h
similarity index 93%
rename from heron/tmaster/src/cpp/manager/stateful-controller.h
rename to heron/tmanager/src/cpp/manager/stateful-controller.h
index 4e3b618..a5ad08f 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.h
+++ b/heron/tmanager/src/cpp/manager/stateful-controller.h
@@ -17,13 +17,13 @@
* under the License.
*/
-#ifndef __TMASTER_STATEFUL_HELPER_H_
-#define __TMASTER_STATEFUL_HELPER_H_
+#ifndef __TMANAGER_STATEFUL_HELPER_H_
+#define __TMANAGER_STATEFUL_HELPER_H_
#include <set>
#include <string>
#include "network/network.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "basics/basics.h"
namespace heron {
@@ -34,7 +34,7 @@
} // namespace heron
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::unique_ptr;
using std::shared_ptr;
@@ -42,7 +42,7 @@
class StatefulRestorer;
class StatefulCheckpointer;
-// For Heron topologies running in effectively once semantics, the tmaster
+// For Heron topologies running in effectively once semantics, the tmanager
// utilizes the stateful controller to handle all the work related with
// checkpointing and restoring from checkpoints. The statful controller
// offers methods to start checkpoint/restore. It also manages the state
@@ -56,14 +56,14 @@
explicit StatefulController(const std::string& _topology_name,
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
shared_ptr<heron::common::HeronStateMgr> _state_mgr,
- std::chrono::high_resolution_clock::time_point _tmaster_start_time,
+ std::chrono::high_resolution_clock::time_point _tmanager_start_time,
shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)>
_ckpt_save_watcher);
virtual ~StatefulController();
// Start a new restore process
void StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_checkpoints);
- // Called by tmaster when a Stmgr responds back with a RestoreTopologyStateResponse
+ // Called by tmanager when a Stmgr responds back with a RestoreTopologyStateResponse
void HandleStMgrRestored(const std::string& _stmgr_id,
const std::string& _checkpoint_id,
int64_t _restore_txid,
@@ -105,7 +105,7 @@
shared_ptr<common::MultiCountMetric> count_metrics_;
std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> ckpt_save_watcher_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/stateful-restorer.cpp b/heron/tmanager/src/cpp/manager/stateful-restorer.cpp
similarity index 98%
rename from heron/tmaster/src/cpp/manager/stateful-restorer.cpp
rename to heron/tmanager/src/cpp/manager/stateful-restorer.cpp
index d6a76d0..123c2cb 100644
--- a/heron/tmaster/src/cpp/manager/stateful-restorer.cpp
+++ b/heron/tmanager/src/cpp/manager/stateful-restorer.cpp
@@ -22,7 +22,7 @@
#include "manager/stmgrstate.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
StatefulRestorer::StatefulRestorer()
: in_progress_(false),
@@ -82,5 +82,5 @@
checkpoint_id_in_progress_ = "";
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stateful-restorer.h b/heron/tmanager/src/cpp/manager/stateful-restorer.h
similarity index 91%
rename from heron/tmaster/src/cpp/manager/stateful-restorer.h
rename to heron/tmanager/src/cpp/manager/stateful-restorer.h
index 3871d4d..c1b3ec5 100644
--- a/heron/tmaster/src/cpp/manager/stateful-restorer.h
+++ b/heron/tmanager/src/cpp/manager/stateful-restorer.h
@@ -17,15 +17,15 @@
* under the License.
*/
-#ifndef __TMASTER_STATEFUL_RESTORER_H_
-#define __TMASTER_STATEFUL_RESTORER_H_
+#ifndef __TMANAGER_STATEFUL_RESTORER_H_
+#define __TMANAGER_STATEFUL_RESTORER_H_
#include <set>
#include <string>
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
class StatefulRestorer {
public:
@@ -53,7 +53,7 @@
std::string checkpoint_id_in_progress_;
std::set<std::string> unreplied_stmgrs_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.cpp b/heron/tmanager/src/cpp/manager/stats-interface.cpp
similarity index 89%
rename from heron/tmaster/src/cpp/manager/stats-interface.cpp
rename to heron/tmanager/src/cpp/manager/stats-interface.cpp
index a34f2ab..f2d0abd 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.cpp
+++ b/heron/tmanager/src/cpp/manager/stats-interface.cpp
@@ -17,24 +17,24 @@
* under the License.
*/
-#include "tmaster/src/cpp/manager/stats-interface.h"
+#include "tmanager/src/cpp/manager/stats-interface.h"
#include <iostream>
#include <string>
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "manager/tmetrics-collector.h"
-#include "metrics/tmaster-metrics.h"
+#include "metrics/tmanager-metrics.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
StatsInterface::StatsInterface(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
- shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster)
- : metrics_collector_(_collector), tmaster_(_tmaster) {
+ shared_ptr<TMetricsCollector> _collector, TManager* _tmanager)
+ : metrics_collector_(_collector), tmanager_(_tmanager) {
http_server_ = make_unique<HTTPServer>(eventLoop, _options);
// Install the handlers
auto cbHandleStats = [this](IncomingHTTPRequest* request) { this->HandleStatsRequest(request); };
@@ -70,14 +70,14 @@
LOG(INFO) << "Got a stats request " << _request->GetQuery();
// get the entire stuff
unsigned char* pb = _request->ExtractFromPostData(0, _request->GetPayloadSize());
- proto::tmaster::MetricRequest req;
+ proto::tmanager::MetricRequest req;
if (!req.ParseFromArray(pb, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in StatsRequest";
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
- auto res = metrics_collector_->GetMetrics(req, tmaster_->getInitialTopology());
+ auto res = metrics_collector_->GetMetrics(req, tmanager_->getInitialTopology());
sp_string response_string;
CHECK(res->SerializeToString(&response_string));
auto response = make_unique<OutgoingHTTPResponse>(_request);
@@ -93,14 +93,14 @@
LOG(INFO) << "Request for exceptions" << _request->GetQuery();
// Get the Exception request proto.
unsigned char* request_data = _request->ExtractFromPostData(0, _request->GetPayloadSize());
- heron::proto::tmaster::ExceptionLogRequest exception_request;
+ heron::proto::tmanager::ExceptionLogRequest exception_request;
if (!exception_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in ExceptionRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
- unique_ptr<heron::proto::tmaster::ExceptionLogResponse> exception_response =
+ unique_ptr<heron::proto::tmanager::ExceptionLogResponse> exception_response =
metrics_collector_->GetExceptions(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
@@ -116,7 +116,7 @@
void StatsInterface::HandleExceptionSummaryRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for exception summary " << _request->GetQuery();
unsigned char* request_data = _request->ExtractFromPostData(0, _request->GetPayloadSize());
- heron::proto::tmaster::ExceptionLogRequest exception_request;
+ heron::proto::tmanager::ExceptionLogRequest exception_request;
if (!exception_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in ExceptionRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
@@ -139,7 +139,7 @@
LOG(INFO) << "Request for stream managers registration summary " << _request->GetQuery();
unsigned char* request_data =
_request->ExtractFromPostData(0, _request->GetPayloadSize());
- heron::proto::tmaster::StmgrsRegistrationSummaryRequest stmgrs_reg_request;
+ heron::proto::tmanager::StmgrsRegistrationSummaryRequest stmgrs_reg_request;
if (!stmgrs_reg_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in" <<
"StmgrsRegistrationSummaryRequest" << std::endl;
@@ -147,7 +147,7 @@
delete _request;
return;
}
- auto stmgrs_reg_summary_response = tmaster_->GetStmgrsRegSummary();
+ auto stmgrs_reg_summary_response = tmanager_->GetStmgrsRegSummary();
sp_string response_string;
CHECK(stmgrs_reg_summary_response->SerializeToString(&response_string));
auto http_response = make_unique<OutgoingHTTPResponse>(_request);
@@ -164,5 +164,5 @@
http_server_->SendErrorReply(_request, 400);
delete _request;
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.h b/heron/tmanager/src/cpp/manager/stats-interface.h
similarity index 85%
rename from heron/tmaster/src/cpp/manager/stats-interface.h
rename to heron/tmanager/src/cpp/manager/stats-interface.h
index fa7fa79..e640ed5 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.h
+++ b/heron/tmanager/src/cpp/manager/stats-interface.h
@@ -17,26 +17,26 @@
* under the License.
*/
-#ifndef __TMASTER_STATS_INTERFACE_H_
-#define __TMASTER_STATS_INTERFACE_H_
+#ifndef __TMANAGER_STATS_INTERFACE_H_
+#define __TMANAGER_STATS_INTERFACE_H_
#include "network/network.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "basics/basics.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::unique_ptr;
using std::shared_ptr;
class TMetricsCollector;
-class TMaster;
+class TManager;
class StatsInterface {
public:
StatsInterface(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options,
- shared_ptr<TMetricsCollector> _collector, TMaster* tmaster);
+ shared_ptr<TMetricsCollector> _collector, TManager* tmanager);
virtual ~StatsInterface();
private:
@@ -48,9 +48,9 @@
unique_ptr<HTTPServer> http_server_; // Our http server
shared_ptr<TMetricsCollector> metrics_collector_;
- TMaster* tmaster_;
+ TManager* tmanager_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp b/heron/tmanager/src/cpp/manager/stmgrstate.cpp
similarity index 97%
rename from heron/tmaster/src/cpp/manager/stmgrstate.cpp
rename to heron/tmanager/src/cpp/manager/stmgrstate.cpp
index b144244..06f2a2b 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp
+++ b/heron/tmanager/src/cpp/manager/stmgrstate.cpp
@@ -21,7 +21,7 @@
#include <iostream>
#include <string>
#include <vector>
-#include "manager/tmasterserver.h"
+#include "manager/tmanagerserver.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
@@ -30,7 +30,7 @@
#include "config/heron-internals-config-reader.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
StMgrState::StMgrState(Connection* _conn, const proto::system::StMgr& _stmgr,
const std::vector<shared_ptr<proto::system::Instance>>& _instances,
@@ -141,8 +141,8 @@
bool StMgrState::TimedOut() const {
sp_int32 timeout =
- config::HeronInternalsConfigReader::Instance()->GetHeronTmasterStmgrStateTimeoutSec();
+ config::HeronInternalsConfigReader::Instance()->GetHeronTmanagerStmgrStateTimeoutSec();
return (time(NULL) - last_heartbeat_) > timeout;
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h b/heron/tmanager/src/cpp/manager/stmgrstate.h
similarity index 95%
rename from heron/tmaster/src/cpp/manager/stmgrstate.h
rename to heron/tmanager/src/cpp/manager/stmgrstate.h
index d9d9159..449670c 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.h
+++ b/heron/tmanager/src/cpp/manager/stmgrstate.h
@@ -23,7 +23,7 @@
#include <string>
#include <vector>
#include "network/network.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "proto/ckptmgr.pb.h"
#include "basics/basics.h"
@@ -38,11 +38,11 @@
}
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::shared_ptr;
-class TMasterServer;
+class TManagerServer;
class StMgrState {
public:
@@ -96,10 +96,10 @@
shared_ptr<proto::system::StMgr> stmgr_;
// The connection used by the nodemanager to contact us
Connection* connection_;
- // Our link to our TMaster
+ // Our link to our TManager
Server& server_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmanager/src/cpp/manager/tcontroller.cpp
similarity index 91%
rename from heron/tmaster/src/cpp/manager/tcontroller.cpp
rename to heron/tmanager/src/cpp/manager/tcontroller.cpp
index 8bc4ffc..73fba42 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmanager/src/cpp/manager/tcontroller.cpp
@@ -29,20 +29,20 @@
#include "cereal/external/base64.hpp"
#include "config/topology-config-helper.h"
#include "errors/errors.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "network/network.h"
#include "proto/messages.h"
#include "threads/threads.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
/*
* HTTP service controller.
*/
TController::TController(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options,
- TMaster* tmaster)
- : tmaster_(tmaster) {
+ TManager* tmanager)
+ : tmanager_(tmanager) {
http_server_ = make_unique<HTTPServer>(eventLoop, options);
/*
* Install the handlers
@@ -90,7 +90,7 @@
return;
}
- if (tmaster_->GetTopologyState() != proto::api::PAUSED) {
+ if (tmanager_->GetTopologyState() != proto::api::PAUSED) {
LOG(ERROR) << "Topology not in paused state";
http_server_->SendErrorReply(request, 400);
delete request;
@@ -101,7 +101,7 @@
this->HandleActivateRequestDone(request, status);
};
- tmaster_->ActivateTopology(std::move(cb));
+ tmanager_->ActivateTopology(std::move(cb));
}
void TController::HandleActivateRequestDone(IncomingHTTPRequest* request,
@@ -129,7 +129,7 @@
return;
}
- if (tmaster_->GetTopologyState() != proto::api::RUNNING) {
+ if (tmanager_->GetTopologyState() != proto::api::RUNNING) {
LOG(ERROR) << "Topology not in running state";
http_server_->SendErrorReply(request, 400);
delete request;
@@ -140,7 +140,7 @@
this->HandleDeActivateRequestDone(request, status);
};
- tmaster_->DeActivateTopology(std::move(cb));
+ tmanager_->DeActivateTopology(std::move(cb));
}
void TController::HandleDeActivateRequestDone(IncomingHTTPRequest* request,
@@ -179,7 +179,7 @@
this->HandleCleanStatefulCheckpointRequestDone(request, status);
};
- tmaster_->CleanAllStatefulCheckpoint();
+ tmanager_->CleanAllStatefulCheckpoint();
}
void TController::HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status) {
@@ -236,7 +236,7 @@
}
// Validate them before applying
- if (!tmaster_->ValidateRuntimeConfig(config)) {
+ if (!tmanager_->ValidateRuntimeConfig(config)) {
http_server_->SendErrorReply(request, 400, "Failed to validate runtime configs");
delete request;
return;
@@ -246,7 +246,7 @@
this->HandleUpdateRuntimeConfigRequestDone(request, status);
};
- if (!tmaster_->UpdateRuntimeConfig(config, std::move(cb))) {
+ if (!tmanager_->UpdateRuntimeConfig(config, std::move(cb))) {
http_server_->SendErrorReply(request, 400, "Failed to update runtime configs");
delete request;
return;
@@ -275,18 +275,18 @@
<< request->GetRemotePort();
// make sure all the stream managers are alive, in case that when container is fail,
- // physical plan is still available at TMaster but not a valid one.
- if (tmaster_->GetStmgrsRegSummary()->absent_stmgrs_size() != 0) {
+ // physical plan is still available at TManager but not a valid one.
+ if (tmanager_->GetStmgrsRegSummary()->absent_stmgrs_size() != 0) {
http_server_->SendErrorReply(request, 400);
delete request;
return;
}
- if (tmaster_->getPhysicalPlan() == NULL) {
+ if (tmanager_->getPhysicalPlan() == NULL) {
http_server_->SendErrorReply(request, 400);
} else {
std::string pplanString;
- tmaster_->getPhysicalPlan()->SerializeToString(&pplanString);
+ tmanager_->getPhysicalPlan()->SerializeToString(&pplanString);
// SerializeToString() returns object in binary format which needs to be encoded
const unsigned char * encodeString = (unsigned char *)pplanString.c_str();
@@ -314,14 +314,14 @@
result.SetResult(400, "Missing 'topologyid' argument in the request");
return false;
}
- if (id != tmaster_->GetTopologyId()) {
+ if (id != tmanager_->GetTopologyId()) {
LOG(ERROR) << "Topology id does not match";
result.SetResult(400, "Topology id does not match");
return false;
}
- if (tmaster_->getPhysicalPlan() == NULL) {
- LOG(ERROR) << "Tmaster still not initialized (physical plan is not available)";
- result.SetResult(500, "Tmaster still not initialized (physical plan is not available)");
+ if (tmanager_->getPhysicalPlan() == NULL) {
+ LOG(ERROR) << "Tmanager still not initialized (physical plan is not available)";
+ result.SetResult(500, "Tmanager still not initialized (physical plan is not available)");
return false;
}
@@ -356,5 +356,5 @@
return true;
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.h b/heron/tmanager/src/cpp/manager/tcontroller.h
similarity index 93%
rename from heron/tmaster/src/cpp/manager/tcontroller.h
rename to heron/tmanager/src/cpp/manager/tcontroller.h
index dec9d5e..398a0ca 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.h
+++ b/heron/tmanager/src/cpp/manager/tcontroller.h
@@ -25,25 +25,25 @@
#include <vector>
#include "network/network.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "basics/basics.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::shared_ptr;
-class TMaster;
+class TManager;
class TController {
public:
- TController(shared_ptr<EventLoop> eventLoop, const NetworkOptions& options, TMaster* tmaster);
+ TController(shared_ptr<EventLoop> eventLoop, const NetworkOptions& options, TManager* tmanager);
virtual ~TController();
// Starts the controller
sp_int32 Start();
- // Called by the tmaster when it gets response form ckptmgr
+ // Called by the tmanager when it gets response form ckptmgr
void HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status);
// Parse and build a map of component name to config kv map from incoming runtime configs.
@@ -73,8 +73,8 @@
// We are a http server
unique_ptr<HTTPServer> http_server_;
- // our tmaster
- TMaster* tmaster_;
+ // our tmanager
+ TManager* tmanager_;
// The callback to be called upon receiving clean stateful checkpoint response
std::function<void(proto::system::StatusCode)> clean_stateful_checkpoint_cb_;
@@ -101,7 +101,7 @@
bool ValidateTopology(const IncomingHTTPRequest* request, ValidationResult& result);
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmanager/src/cpp/manager/tmanager.cpp
similarity index 80%
rename from heron/tmaster/src/cpp/manager/tmaster.cpp
rename to heron/tmanager/src/cpp/manager/tmanager.cpp
index f8dd296..8848748 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmanager/src/cpp/manager/tmanager.cpp
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include <sys/resource.h>
#include <iostream>
#include <map>
@@ -28,7 +28,7 @@
#include "manager/tmetrics-collector.h"
#include "manager/tcontroller.h"
#include "manager/stats-interface.h"
-#include "manager/tmasterserver.h"
+#include "manager/tmanagerserver.h"
#include "manager/stmgrstate.h"
#include "manager/stateful-controller.h"
#include "manager/ckptmgr-client.h"
@@ -42,10 +42,10 @@
#include "config/helper.h"
#include "config/heron-internals-config-reader.h"
#include "statemgr/heron-statemgr.h"
-#include "metrics/tmaster-metrics.h"
+#include "metrics/tmanager-metrics.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::unique_ptr;
using std::make_shared;
@@ -60,20 +60,20 @@
const sp_int64 UPTIME_METRIC_FREQUENCY = 1_s;
const sp_string METRIC_PREFIX = "__process";
-TMaster::TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
+TManager::TManager(const std::string& _zk_hostport, const std::string& _topology_name,
const std::string& _topology_id, const std::string& _topdir,
- sp_int32 _tmaster_controller_port,
- sp_int32 _master_port, sp_int32 _stats_port, sp_int32 metricsMgrPort,
+ sp_int32 _tmanager_controller_port,
+ sp_int32 _server_port, sp_int32 _stats_port, sp_int32 metricsMgrPort,
sp_int32 _ckptmgr_port,
const std::string& _metrics_sinks_yaml, const std::string& _myhost_name,
shared_ptr<EventLoop> eventLoop) {
start_time_ = std::chrono::high_resolution_clock::now();
zk_hostport_ = _zk_hostport;
topdir_ = _topdir;
- tmaster_controller_ = nullptr;
- tmaster_controller_port_ = _tmaster_controller_port;
- master_ = nullptr;
- master_port_ = _master_port;
+ tmanager_controller_ = nullptr;
+ tmanager_controller_port_ = _tmanager_controller_port;
+ server_ = nullptr;
+ server_port_ = _server_port;
stats_ = nullptr;
stats_port_ = _stats_port;
myhost_name_ = _myhost_name;
@@ -82,7 +82,7 @@
http_client_ = new HTTPClient(eventLoop_, dns_);
metrics_collector_ = make_shared<TMetricsCollector>(config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterMetricsCollectorMaximumIntervalMin() * 60,
+ ->GetHeronTmanagerMetricsCollectorMaximumIntervalMin() * 60,
eventLoop_, _metrics_sinks_yaml);
mMetricsMgrPort = metricsMgrPort;
@@ -92,11 +92,11 @@
mMetricsMgrClient = make_shared<heron::common::MetricsMgrSt>(
mMetricsMgrPort, metricsExportIntervalSec, eventLoop_);
- mMetricsMgrClient->Start(myhost_name_, master_port_, "__tmaster__",
- "0"); // MM expects task_id, so just giving 0 for tmaster.
+ mMetricsMgrClient->Start(myhost_name_, server_port_, "__tmanager__",
+ "0"); // MM expects task_id, so just giving 0 for tmanager.
- tmasterProcessMetrics = make_shared<heron::common::MultiAssignableMetric>();
- mMetricsMgrClient->register_metric(METRIC_PREFIX, tmasterProcessMetrics);
+ tmanagerProcessMetrics = make_shared<heron::common::MultiAssignableMetric>();
+ mMetricsMgrClient->register_metric(METRIC_PREFIX, tmanagerProcessMetrics);
ckptmgr_port_ = _ckptmgr_port;
ckptmgr_client_ = nullptr;
@@ -104,7 +104,7 @@
current_pplan_ = nullptr;
// The topology as first submitted by the user
- // It shall only be used to construct the physical plan when TMaster first time starts
+ // It shall only be used to construct the physical plan when TManager first time starts
// Any runtime changes shall be made to current_pplan_->topology
topology_ = nullptr;
packing_plan_ = nullptr;
@@ -113,19 +113,19 @@
assignment_in_progress_ = false;
do_reassign_ = false;
- master_establish_attempts_ = 0;
- tmaster_location_ = make_unique<proto::tmaster::TMasterLocation>();
- tmaster_location_->set_topology_name(_topology_name);
- tmaster_location_->set_topology_id(_topology_id);
- tmaster_location_->set_host(myhost_name_);
- tmaster_location_->set_controller_port(tmaster_controller_port_);
- tmaster_location_->set_master_port(master_port_);
- tmaster_location_->set_stats_port(stats_port_);
- DCHECK(tmaster_location_->IsInitialized());
+ server_establish_attempts_ = 0;
+ tmanager_location_ = make_unique<proto::tmanager::TManagerLocation>();
+ tmanager_location_->set_topology_name(_topology_name);
+ tmanager_location_->set_topology_id(_topology_id);
+ tmanager_location_->set_host(myhost_name_);
+ tmanager_location_->set_controller_port(tmanager_controller_port_);
+ tmanager_location_->set_server_port(server_port_);
+ tmanager_location_->set_stats_port(stats_port_);
+ DCHECK(tmanager_location_->IsInitialized());
FetchPackingPlan();
- // Send tmaster location to metrics mgr
- mMetricsMgrClient->RefreshTMasterLocation(*tmaster_location_);
+ // Send tmanager location to metrics mgr
+ mMetricsMgrClient->RefreshTManagerLocation(*tmanager_location_);
// Check for log pruning every 5 minutes
CHECK_GT(eventLoop_->registerTimer(
@@ -154,16 +154,16 @@
stateful_controller_ = nullptr;
}
-void TMaster::FetchPackingPlan() {
+void TManager::FetchPackingPlan() {
auto packing_plan = make_shared<proto::system::PackingPlan>();
- state_mgr_->GetPackingPlan(tmaster_location_->topology_name(), packing_plan,
+ state_mgr_->GetPackingPlan(tmanager_location_->topology_name(), packing_plan,
[packing_plan, this](proto::system::StatusCode status) {
this->OnPackingPlanFetch(packing_plan, status);
});
}
-void TMaster::OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
+void TManager::OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(INFO) << "PackingPlan Fetch failed with status " << _status;
@@ -192,19 +192,19 @@
}
// this is part of the initialization process. Since we've got a packing plan we will
- // register our self as the master
- EstablishTMaster(EventLoop::TIMEOUT_EVENT);
+ // register our self as the server
+ EstablishTManager(EventLoop::TIMEOUT_EVENT);
} else {
- // We must know for sure that we are TMaster before potentially deleting the physical plan
+ // We must know for sure that we are TManager before potentially deleting the physical plan
// in state manager. We know this to be the case here because we initially fetch
- // packing_plan_ before becoming master, but we register the packing plan watcher only after
- // becoming master. That guarantees that if packing_plan_ is already set and this method is
- // invoked, it's due to the watch and we're master here.
+ // packing_plan_ before becoming server, but we register the packing plan watcher only after
+ // becoming server. That guarantees that if packing_plan_ is already set and this method is
+ // invoked, it's due to the watch and we're server here.
if (packing_plan_ != newPackingPlan) {
- LOG(INFO) << "Packing plan changed. Deleting physical plan and restarting TMaster to "
+ LOG(INFO) << "Packing plan changed. Deleting physical plan and restarting TManager to "
<< "reset internal state. Exiting.";
- state_mgr_->DeletePhysicalPlan(tmaster_location_->topology_name(),
+ state_mgr_->DeletePhysicalPlan(tmanager_location_->topology_name(),
[this](proto::system::StatusCode status) {
::exit(1);
});
@@ -215,24 +215,24 @@
}
}
-void TMaster::EstablishTMaster(EventLoop::Status) {
- auto cb = [this](proto::system::StatusCode code) { this->SetTMasterLocationDone(code); };
+void TManager::EstablishTManager(EventLoop::Status) {
+ auto cb = [this](proto::system::StatusCode code) { this->SetTManagerLocationDone(code); };
- state_mgr_->SetTMasterLocation(*tmaster_location_, std::move(cb));
+ state_mgr_->SetTManagerLocation(*tmanager_location_, std::move(cb));
- // if zk lost the tmaster location, tmaster quits to bail out and re-establish its location
+ // if zk lost the tmanager location, tmanager quits to bail out and re-establish its location
auto cb2 = [this]() {
- LOG(ERROR) << " lost tmaster location in zk state manager. Bailing out..." << std::endl;
+ LOG(ERROR) << " lost tmanager location in zk state manager. Bailing out..." << std::endl;
::exit(1);
};
- state_mgr_->SetTMasterLocationWatch(tmaster_location_->topology_name(), std::move(cb2));
+ state_mgr_->SetTManagerLocationWatch(tmanager_location_->topology_name(), std::move(cb2));
- master_establish_attempts_++;
+ server_establish_attempts_++;
}
-TMaster::~TMaster() {
- if (master_) {
- master_->Stop();
+TManager::~TManager() {
+ if (server_) {
+ server_->Stop();
}
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
@@ -247,68 +247,68 @@
delete dns_;
}
-void TMaster::UpdateUptimeMetric() {
+void TManager::UpdateUptimeMetric() {
auto end_time = std::chrono::high_resolution_clock::now();
auto uptime = std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time_).count();
- tmasterProcessMetrics->scope(METRIC_UPTIME)->SetValue(uptime);
+ tmanagerProcessMetrics->scope(METRIC_UPTIME)->SetValue(uptime);
}
-void TMaster::UpdateProcessMetrics(EventLoop::Status) {
+void TManager::UpdateProcessMetrics(EventLoop::Status) {
// CPU
struct rusage usage;
ProcessUtils::getResourceUsage(&usage);
- tmasterProcessMetrics->scope(METRIC_CPU_USER)
+ tmanagerProcessMetrics->scope(METRIC_CPU_USER)
->SetValue((usage.ru_utime.tv_sec * 1_s) + usage.ru_utime.tv_usec);
- tmasterProcessMetrics->scope(METRIC_CPU_SYSTEM)
+ tmanagerProcessMetrics->scope(METRIC_CPU_SYSTEM)
->SetValue((usage.ru_stime.tv_sec * 1_s) + usage.ru_stime.tv_usec);
// Memory
size_t totalmemory = ProcessUtils::getTotalMemoryUsed();
- tmasterProcessMetrics->scope(METRIC_MEM_USED)->SetValue(totalmemory);
+ tmanagerProcessMetrics->scope(METRIC_MEM_USED)->SetValue(totalmemory);
}
-void TMaster::SetTMasterLocationDone(proto::system::StatusCode _code) {
+void TManager::SetTManagerLocationDone(proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
- if (_code == proto::system::TMASTERLOCATION_ALREADY_EXISTS &&
- master_establish_attempts_ <
- config::HeronInternalsConfigReader::Instance()->GetHeronTmasterEstablishRetryTimes()) {
- LOG(INFO) << "Topology Master node already exists. Maybe its "
+ if (_code == proto::system::TMANAGERLOCATION_ALREADY_EXISTS &&
+ server_establish_attempts_ <
+ config::HeronInternalsConfigReader::Instance()->GetHeronTmanagerEstablishRetryTimes()) {
+ LOG(INFO) << "Topology Manager node already exists. Maybe its "
<< "because of our restart. Will try again" << std::endl;
// Attempt again
- auto cb = [this](EventLoop::Status status) { this->EstablishTMaster(status); };
+ auto cb = [this](EventLoop::Status status) { this->EstablishTManager(status); };
eventLoop_->registerTimer(std::move(cb), false,
config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterEstablishRetryIntervalSec() *
+ ->GetHeronTmanagerEstablishRetryIntervalSec() *
1_s);
return;
}
// There was an error setting our location
- LOG(ERROR) << "For topology " << tmaster_location_->topology_name()
- << " Error setting ourselves as TMaster. Error code is " << _code << std::endl;
+ LOG(ERROR) << "For topology " << tmanager_location_->topology_name()
+ << " Error setting ourselves as TManager. Error code is " << _code << std::endl;
::exit(1);
}
- master_establish_attempts_ = 0;
+ server_establish_attempts_ = 0;
- // We are now the master
- LOG(INFO) << "Successfully set ourselves as master\n";
+ // We are now the server
+ LOG(INFO) << "Successfully set ourselves as server\n";
// Lets now read the topology
topology_ = make_unique<proto::api::Topology>();
- state_mgr_->GetTopology(tmaster_location_->topology_name(), *topology_,
+ state_mgr_->GetTopology(tmanager_location_->topology_name(), *topology_,
[this](proto::system::StatusCode code) {
this->GetTopologyDone(code);
});
// and register packing plan watcher to pick up changes
- state_mgr_->SetPackingPlanWatch(tmaster_location_->topology_name(), [this]() {
+ state_mgr_->SetPackingPlanWatch(tmanager_location_->topology_name(), [this]() {
this->FetchPackingPlan();
});
}
-void TMaster::GetTopologyDone(proto::system::StatusCode _code) {
+void TManager::GetTopologyDone(proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
// Without Topology we can't do much
- LOG(ERROR) << "For topology " << tmaster_location_->topology_name()
+ LOG(ERROR) << "For topology " << tmanager_location_->topology_name()
<< " Error getting topology. Error code is " << _code << std::endl;
::exit(1);
}
@@ -329,11 +329,11 @@
ckpt_options.set_host("127.0.0.1");
ckpt_options.set_port(ckptmgr_port_);
ckpt_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
+ ->GetHeronTmanagerNetworkServerOptionsMaximumPacketMb() *
1024 * 1024);
ckptmgr_client_ = make_unique<CkptMgrClient>(eventLoop_, ckpt_options,
topology_->name(), topology_->id(),
- std::bind(&TMaster::HandleCleanStatefulCheckpointResponse,
+ std::bind(&TManager::HandleCleanStatefulCheckpointResponse,
this, std::placeholders::_1));
// Start the client
ckptmgr_client_->Start();
@@ -344,22 +344,22 @@
this->GetStatefulCheckpointsDone(ckpt, code);
};
- state_mgr_->GetStatefulCheckpoints(tmaster_location_->topology_name(), ckpt, std::move(cb));
+ state_mgr_->GetStatefulCheckpoints(tmanager_location_->topology_name(), ckpt, std::move(cb));
} else {
// Now see if there is already a pplan
FetchPhysicalPlan();
}
}
-void TMaster::GetStatefulCheckpointsDone(
+void TManager::GetStatefulCheckpointsDone(
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
proto::system::StatusCode _code) {
if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
- LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
+ LOG(FATAL) << "For topology " << tmanager_location_->topology_name()
<< " Getting Stateful Checkpoint failed with error " << _code;
}
if (_code == proto::system::PATH_DOES_NOT_EXIST) {
- LOG(INFO) << "For topology " << tmaster_location_->topology_name()
+ LOG(INFO) << "For topology " << tmanager_location_->topology_name()
<< " No existing globally consistent checkpoint found "
<< " inserting a empty one";
// We need to set an empty one
@@ -371,10 +371,10 @@
this->SetStatefulCheckpointsDone(code, ckpts);
};
- state_mgr_->CreateStatefulCheckpoints(tmaster_location_->topology_name(),
+ state_mgr_->CreateStatefulCheckpoints(tmanager_location_->topology_name(),
ckpts, std::move(cb));
} else {
- LOG(INFO) << "For topology " << tmaster_location_->topology_name()
+ LOG(INFO) << "For topology " << tmanager_location_->topology_name()
<< " An existing globally consistent checkpoint found "
<< _ckpt->DebugString();
SetupStatefulController(std::move(_ckpt));
@@ -382,17 +382,17 @@
}
}
-void TMaster::SetStatefulCheckpointsDone(proto::system::StatusCode _code,
+void TManager::SetStatefulCheckpointsDone(proto::system::StatusCode _code,
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
if (_code != proto::system::OK) {
- LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
+ LOG(FATAL) << "For topology " << tmanager_location_->topology_name()
<< " Setting empty Stateful Checkpoint failed with error " << _code;
}
SetupStatefulController(_ckpt);
FetchPhysicalPlan();
}
-void TMaster::SetupStatefulController(
+void TManager::SetupStatefulController(
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
sp_int64 stateful_checkpoint_interval =
config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300);
@@ -412,7 +412,7 @@
0);
}
-void TMaster::ResetTopologyState(Connection* _conn, const std::string& _dead_stmgr,
+void TManager::ResetTopologyState(Connection* _conn, const std::string& _dead_stmgr,
int32_t _dead_instance, const std::string& _reason) {
LOG(INFO) << "Got a reset topology request with dead_stmgr " << _dead_stmgr
<< " dead_instance " << _dead_instance << " and reason " << _reason;
@@ -440,16 +440,16 @@
}
}
-void TMaster::FetchPhysicalPlan() {
+void TManager::FetchPhysicalPlan() {
auto pplan = make_shared<proto::system::PhysicalPlan>();
auto cb = [pplan, this](proto::system::StatusCode code) {
this->GetPhysicalPlanDone(pplan, code);
};
- state_mgr_->GetPhysicalPlan(tmaster_location_->topology_name(), pplan, std::move(cb));
+ state_mgr_->GetPhysicalPlan(tmanager_location_->topology_name(), pplan, std::move(cb));
}
-void TMaster::SendCheckpointMarker() {
+void TManager::SendCheckpointMarker() {
if (!absent_stmgrs_.empty()) {
LOG(INFO) << "Not sending checkpoint marker because not all stmgrs have connected to us";
return;
@@ -457,7 +457,7 @@
stateful_controller_->StartCheckpoint(stmgrs_);
}
-void TMaster::HandleInstanceStateStored(const std::string& _checkpoint_id,
+void TManager::HandleInstanceStateStored(const std::string& _checkpoint_id,
const proto::system::Instance& _instance) {
LOG(INFO) << "Got notification from stmgr that we saved checkpoint for task "
<< _instance.info().task_id() << " for checkpoint " << _checkpoint_id;
@@ -466,7 +466,7 @@
}
}
-void TMaster::HandleRestoreTopologyStateResponse(Connection* _conn,
+void TManager::HandleRestoreTopologyStateResponse(Connection* _conn,
const std::string& _checkpoint_id,
int64_t _restore_txid,
proto::system::StatusCode _status) {
@@ -481,14 +481,14 @@
}
}
-void TMaster::GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
+void TManager::GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code) {
// Physical plan need not exist. First check if some other error occurred.
if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
// Something bad happened. Bail out!
// TODO(kramasamy): This is not as bad as it seems. Maybe we can delete this assignment
// and have a new assignment instead.
- LOG(ERROR) << "For topology " << tmaster_location_->topology_name()
+ LOG(ERROR) << "For topology " << tmanager_location_->topology_name()
<< " Error getting assignment. Error code is " << _code << std::endl;
::exit(1);
}
@@ -507,53 +507,53 @@
// Now that we have our state all setup, its time to start accepting requests
// Port for the stmgrs to connect to
- NetworkOptions master_options;
- master_options.set_host(myhost_name_);
- master_options.set_port(master_port_);
- master_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
+ NetworkOptions server_options;
+ server_options.set_host(myhost_name_);
+ server_options.set_port(server_port_);
+ server_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
+ ->GetHeronTmanagerNetworkServerOptionsMaximumPacketMb() *
1_MB);
- master_options.set_socket_family(PF_INET);
- master_ = make_unique<TMasterServer>(eventLoop_, master_options, metrics_collector_, this);
+ server_options.set_socket_family(PF_INET);
+ server_ = make_unique<TManagerServer>(eventLoop_, server_options, metrics_collector_, this);
- sp_int32 retval = master_->Start();
+ sp_int32 retval = server_->Start();
if (retval != SP_OK) {
- LOG(FATAL) << "Failed to start TMaster Master Server with rcode: " << retval;
+ LOG(FATAL) << "Failed to start TManager Server with rcode: " << retval;
}
// Port for the scheduler to connect to
NetworkOptions controller_options;
controller_options.set_host(myhost_name_);
- controller_options.set_port(tmaster_controller_port_);
+ controller_options.set_port(tmanager_controller_port_);
controller_options.set_max_packet_size(
config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterNetworkControllerOptionsMaximumPacketMb() *
+ ->GetHeronTmanagerNetworkControllerOptionsMaximumPacketMb() *
1_MB);
controller_options.set_socket_family(PF_INET);
- tmaster_controller_ = make_unique<TController>(eventLoop_, controller_options, this);
+ tmanager_controller_ = make_unique<TController>(eventLoop_, controller_options, this);
- retval = tmaster_controller_->Start();
+ retval = tmanager_controller_->Start();
if (retval != SP_OK) {
- LOG(FATAL) << "Failed to start TMaster Controller Server with rcode: " << retval;
+ LOG(FATAL) << "Failed to start TManager Controller Server with rcode: " << retval;
}
// Http port for stat queries
NetworkOptions stats_options;
if (config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterMetricsNetworkBindAllInterfaces()) {
+ ->GetHeronTmanagerMetricsNetworkBindAllInterfaces()) {
stats_options.set_host("0.0.0.0");
} else {
stats_options.set_host(myhost_name_);
}
stats_options.set_port(stats_port_);
stats_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterNetworkStatsOptionsMaximumPacketMb() *
+ ->GetHeronTmanagerNetworkStatsOptionsMaximumPacketMb() *
1_MB);
stats_options.set_socket_family(PF_INET);
stats_ = make_unique<StatsInterface>(eventLoop_, stats_options, metrics_collector_, this);
}
-void TMaster::ActivateTopology(VCallback<proto::system::StatusCode> cb) {
+void TManager::ActivateTopology(VCallback<proto::system::StatusCode> cb) {
CHECK_EQ(current_pplan_->topology().state(), proto::api::PAUSED);
DCHECK(current_pplan_->topology().IsInitialized());
@@ -570,7 +570,7 @@
state_mgr_->SetPhysicalPlan(*new_pplan, std::move(callback));
}
-void TMaster::DeActivateTopology(VCallback<proto::system::StatusCode> cb) {
+void TManager::DeActivateTopology(VCallback<proto::system::StatusCode> cb) {
CHECK_EQ(current_pplan_->topology().state(), proto::api::RUNNING);
DCHECK(current_pplan_->topology().IsInitialized());
@@ -587,7 +587,7 @@
state_mgr_->SetPhysicalPlan(*new_pplan, std::move(callback));
}
-bool TMaster::UpdateRuntimeConfig(const ComponentConfigMap& _config,
+bool TManager::UpdateRuntimeConfig(const ComponentConfigMap& _config,
VCallback<proto::system::StatusCode> cb) {
DCHECK(current_pplan_->topology().IsInitialized());
@@ -612,11 +612,11 @@
return true;
}
-void TMaster::CleanAllStatefulCheckpoint() {
+void TManager::CleanAllStatefulCheckpoint() {
ckptmgr_client_->SendCleanStatefulCheckpointRequest("", true);
}
-void TMaster::HandleStatefulCheckpointSave(
+void TManager::HandleStatefulCheckpointSave(
const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts) {
// broadcast globally consistent checkpoint completion
proto::ckptmgr::StatefulConsistentCheckpointSaved msg;
@@ -635,14 +635,14 @@
}
// Called when ckptmgr completes the clean stateful checkpoint request
-void TMaster::HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status) {
- tmaster_controller_->HandleCleanStatefulCheckpointResponse(_status);
+void TManager::HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status) {
+ tmanager_controller_->HandleCleanStatefulCheckpointResponse(_status);
}
// Update configurations in physical plan.
// Return false if a config doesn't exist, but this shouldn't happen if the config has been
// validated using ValidateRuntimeConig() function.
-bool TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
+bool TManager::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
const ComponentConfigMap& _config) {
DCHECK(_topology->IsInitialized());
@@ -662,11 +662,11 @@
return true;
}
-bool TMaster::ValidateRuntimeConfig(const ComponentConfigMap& _config) const {
+bool TManager::ValidateRuntimeConfig(const ComponentConfigMap& _config) const {
return ValidateRuntimeConfigNames(_config);
}
-void TMaster::KillContainer(const std::string& host_name,
+void TManager::KillContainer(const std::string& host_name,
sp_int32 shell_port, const std::string& stmgr_id) {
LOG(INFO) << "Start killing " << stmgr_id << " on " <<
host_name << ":" << shell_port;
@@ -689,7 +689,7 @@
return;
}
-proto::system::Status* TMaster::RegisterStMgr(
+proto::system::Status* TManager::RegisterStMgr(
const proto::system::StMgr& _stmgr,
const std::vector<shared_ptr<proto::system::Instance>>& _instances,
Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan) {
@@ -738,7 +738,7 @@
return status;
} else {
// This guy was indeed expected
- stmgrs_[stmgr_id] = make_shared<StMgrState>(_conn, _stmgr, _instances, *master_);
+ stmgrs_[stmgr_id] = make_shared<StMgrState>(_conn, _stmgr, _instances, *server_);
connection_to_stmgr_id_[_conn] = stmgr_id;
absent_stmgrs_.erase(stmgr_id);
}
@@ -762,7 +762,7 @@
return status;
}
-void TMaster::DoPhysicalPlan(EventLoop::Status) {
+void TManager::DoPhysicalPlan(EventLoop::Status) {
do_reassign_ = false;
if (!absent_stmgrs_.empty()) {
@@ -801,7 +801,7 @@
}
}
-void TMaster::SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
+void TManager::SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
LOG(ERROR) << "Error writing assignment to statemgr. Error code is " << _code << std::endl;
@@ -832,7 +832,7 @@
}
}
-bool TMaster::DistributePhysicalPlan() {
+bool TManager::DistributePhysicalPlan() {
if (current_pplan_) {
// First valid the physical plan to distribute
LOG(INFO) << "To distribute new physical plan:" << std::endl;
@@ -851,9 +851,9 @@
return false;
}
-std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse> TMaster::GetStmgrsRegSummary() {
- auto response = std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse>(
- new proto::tmaster::StmgrsRegistrationSummaryResponse());
+std::unique_ptr<proto::tmanager::StmgrsRegistrationSummaryResponse> TManager::GetStmgrsRegSummary() {
+ auto response = std::unique_ptr<proto::tmanager::StmgrsRegistrationSummaryResponse>(
+ new proto::tmanager::StmgrsRegistrationSummaryResponse());
for (auto it = stmgrs_.begin(); it != stmgrs_.end(); ++it) {
response->add_registered_stmgrs(it->first);
@@ -866,7 +866,7 @@
return response;
}
-shared_ptr<proto::system::PhysicalPlan> TMaster::MakePhysicalPlan() {
+shared_ptr<proto::system::PhysicalPlan> TManager::MakePhysicalPlan() {
// TODO(kramasamy): At some point, we need to talk to our scheduler
// and do this scheduling
if (current_pplan_) {
@@ -889,9 +889,9 @@
return new_pplan;
}
- // TMaster does not really have any control over who does what.
+ // TManager does not really have any control over who does what.
// That has already been decided while launching the jobs.
- // TMaster just stitches the info together to pass to everyone
+ // TManager just stitches the info together to pass to everyone
// Build the PhysicalPlan structure
auto new_pplan = make_shared<proto::system::PhysicalPlan>();
@@ -910,7 +910,7 @@
return new_pplan;
}
-proto::system::Status* TMaster::UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
+proto::system::Status* TManager::UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
proto::system::StMgrStats* _stats) {
proto::system::Status* retval = new proto::system::Status();
if (connection_to_stmgr_id_.find(_conn) == connection_to_stmgr_id_.end()) {
@@ -935,7 +935,7 @@
return retval;
}
-proto::system::StatusCode TMaster::RemoveStMgrConnection(Connection* _conn) {
+proto::system::StatusCode TManager::RemoveStMgrConnection(Connection* _conn) {
if (connection_to_stmgr_id_.find(_conn) == connection_to_stmgr_id_.end()) {
return proto::system::INVALID_STMGR;
}
@@ -955,16 +955,16 @@
////////////////////////////////////////////////////////////////////////////////
// Below are valid checking functions
////////////////////////////////////////////////////////////////////////////////
-bool TMaster::ValidateTopology(const proto::api::Topology& _topology) {
- if (tmaster_location_->topology_name() != _topology.name()) {
+bool TManager::ValidateTopology(const proto::api::Topology& _topology) {
+ if (tmanager_location_->topology_name() != _topology.name()) {
LOG(ERROR) << "topology name mismatch! Expected topology name is "
- << tmaster_location_->topology_name() << " but found in zk " << _topology.name()
+ << tmanager_location_->topology_name() << " but found in zk " << _topology.name()
<< std::endl;
return false;
}
- if (tmaster_location_->topology_id() != _topology.id()) {
+ if (tmanager_location_->topology_id() != _topology.id()) {
LOG(ERROR) << "topology id mismatch! Expected topology id is "
- << tmaster_location_->topology_id() << " but found in zk " << _topology.id()
+ << tmanager_location_->topology_id() << " but found in zk " << _topology.id()
<< std::endl;
return false;
}
@@ -988,7 +988,7 @@
return true;
}
-bool TMaster::ValidateStMgrsWithPackingPlan() {
+bool TManager::ValidateStMgrsWithPackingPlan() {
// here we check to see if the total number of instances
// across all stmgrs match up to all the spout/bolt
// parallelism the packing plan has specified
@@ -1005,7 +1005,7 @@
return ninstances == ntasks;
}
-bool TMaster::ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan) {
+bool TManager::ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan) {
std::map<std::string, std::vector<proto::system::Instance*> > stmgr_to_instance_map;
for (sp_int32 i = 0; i < _pplan->instances_size(); ++i) {
proto::system::Instance* instance = _pplan->mutable_instances(i);
@@ -1035,7 +1035,7 @@
/**
* Make sure component names exist
*/
-bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) const {
+bool TManager::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) const {
LOG(INFO) << "Validating runtime configs.";
const proto::api::Topology& topology = current_pplan_->topology();
DCHECK(topology.IsInitialized());
@@ -1057,7 +1057,7 @@
return true;
}
-void TMaster::LogConfig(const ComponentConfigMap& _config) {
+void TManager::LogConfig(const ComponentConfigMap& _config) {
for (auto iter = _config.begin(); iter != _config.end(); ++iter) {
LOG(INFO) << iter->first << " =>";
for (auto i = iter->second.begin(); i != iter->second.end(); ++i) {
@@ -1066,5 +1066,5 @@
}
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmanager/src/cpp/manager/tmanager.h
similarity index 87%
rename from heron/tmaster/src/cpp/manager/tmaster.h
rename to heron/tmanager/src/cpp/manager/tmanager.h
index 3bdd25e..0d3cccd 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmanager/src/cpp/manager/tmanager.h
@@ -17,8 +17,8 @@
* under the License.
*/
-#ifndef __TMASTER_H
-#define __TMASTER_H
+#ifndef __TMANAGER_H
+#define __TMANAGER_H
#include <map>
#include <set>
@@ -29,11 +29,11 @@
#include "metrics/metrics-mgr-st.h"
#include "metrics/metrics.h"
#include "network/network.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "basics/basics.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::unique_ptr;
using std::shared_ptr;
@@ -41,7 +41,7 @@
class StMgrState;
class TController;
class StatsInterface;
-class TMasterServer;
+class TManagerServer;
class TMetricsCollector;
class StatefulController;
class CkptMgrClient;
@@ -53,16 +53,16 @@
// From component name to config/value pairs
typedef std::map<std::string, std::map<std::string, std::string>> ComponentConfigMap;
-class TMaster {
+class TManager {
public:
- TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
+ TManager(const std::string& _zk_hostport, const std::string& _topology_name,
const std::string& _topology_id, const std::string& _topdir,
- sp_int32 _tmaster_controller_port, sp_int32 _master_port,
+ sp_int32 _tmanager_controller_port, sp_int32 _server_port,
sp_int32 _stats_port, sp_int32 metricsMgrPort, sp_int32 _ckptmgr_port,
const std::string& metrics_sinks_yaml,
const std::string& _myhost_name, shared_ptr<EventLoop> eventLoop);
- virtual ~TMaster();
+ virtual ~TManager();
const std::string& GetTopologyId() const { return current_pplan_->topology().id(); }
const std::string& GetTopologyName() const { return current_pplan_->topology().name(); }
@@ -93,12 +93,12 @@
void HandleCleanStatefulCheckpointResponse(proto::system::StatusCode);
// Get stream managers registration summary
- std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse> GetStmgrsRegSummary();
+ std::unique_ptr<proto::tmanager::StmgrsRegistrationSummaryResponse> GetStmgrsRegSummary();
// Accessors
const shared_ptr<proto::system::PhysicalPlan> getPhysicalPlan() const { return current_pplan_; }
// TODO(mfu): Should we provide this?
- // topology_ should only be used to construct physical plan when TMaster first starts
+ // topology_ should only be used to construct physical plan when TManager first starts
// Providing an accessor is bug prone.
// Now used in GetMetrics function in tmetrics-collector
const proto::api::Topology& getInitialTopology() const { return *topology_; }
@@ -106,17 +106,17 @@
// Timer function to start the stateful checkpoint process
void SendCheckpointMarker();
- // Called by tmaster server when it gets InstanceStateStored message
+ // Called by tmanager server when it gets InstanceStateStored message
void HandleInstanceStateStored(const std::string& _checkpoint_id,
const proto::system::Instance& _instance);
- // Called by tmaster server when it gets RestoreTopologyStateResponse message
+ // Called by tmanager server when it gets RestoreTopologyStateResponse message
void HandleRestoreTopologyStateResponse(Connection* _conn,
const std::string& _checkpoint_id,
int64_t _restore_txid,
proto::system::StatusCode _status);
- // Called by tmaster server when it gets ResetTopologyState message
+ // Called by tmanager server when it gets ResetTopologyState message
void ResetTopologyState(Connection* _conn, const std::string& _dead_stmgr,
int32_t _dead_instance, const std::string& _reason);
@@ -156,8 +156,8 @@
// 1. Distribute physical plan to all active stmgrs
bool DistributePhysicalPlan();
- // Function called after we set the tmasterlocation
- void SetTMasterLocationDone(proto::system::StatusCode _code);
+ // Function called after we set the tmanagerlocation
+ void SetTManagerLocationDone(proto::system::StatusCode _code);
// Function called after we get the topology
void GetTopologyDone(proto::system::StatusCode _code);
@@ -178,8 +178,8 @@
void SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code);
- // Function called when we want to setup ourselves as tmaster
- void EstablishTMaster(EventLoop::Status);
+ // Function called when we want to setup ourselves as tmanager
+ void EstablishTManager(EventLoop::Status);
void EstablishPackingPlan(EventLoop::Status);
void FetchPackingPlan();
@@ -216,7 +216,7 @@
shared_ptr<proto::system::PhysicalPlan> current_pplan_;
// The topology as first submitted by the user
- // It shall only be used to construct the physical plan when TMaster first time starts
+ // It shall only be used to construct the physical plan when TManager first time starts
// Any runtime changes shall be made to current_pplan_->topology
unique_ptr<proto::api::Topology> topology_;
@@ -225,8 +225,8 @@
// The statemgr where we store/retrieve our state
shared_ptr<heron::common::HeronStateMgr> state_mgr_;
- // Our copy of the tmasterlocation
- unique_ptr<proto::tmaster::TMasterLocation> tmaster_location_;
+ // Our copy of the tmanagerlocation
+ unique_ptr<proto::tmanager::TManagerLocation> tmanager_location_;
// When we are in the middle of doing assignment
// we set this to true
@@ -238,17 +238,17 @@
std::string topdir_;
// Servers that implement our services
- unique_ptr<TController> tmaster_controller_;
- sp_int32 tmaster_controller_port_;
- unique_ptr<TMasterServer> master_;
- sp_int32 master_port_;
+ unique_ptr<TController> tmanager_controller_;
+ sp_int32 tmanager_controller_port_;
+ unique_ptr<TManagerServer> server_;
+ sp_int32 server_port_;
unique_ptr<StatsInterface> stats_;
sp_int32 stats_port_;
std::string myhost_name_;
// how many times have we tried to establish
- // ourselves as master
- sp_int32 master_establish_attempts_;
+ // ourselves as server
+ sp_int32 server_establish_attempts_;
// collector
shared_ptr<TMetricsCollector> metrics_collector_;
@@ -262,7 +262,7 @@
sp_int32 ckptmgr_port_;
// Process related metrics
- shared_ptr<heron::common::MultiAssignableMetric> tmasterProcessMetrics;
+ shared_ptr<heron::common::MultiAssignableMetric> tmanagerProcessMetrics;
// The time at which the stmgr was started up
std::chrono::high_resolution_clock::time_point start_time_;
@@ -277,7 +277,7 @@
// Copy of the EventLoop
shared_ptr<EventLoop> eventLoop_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmanager/src/cpp/manager/tmanagerserver.cpp b/heron/tmanager/src/cpp/manager/tmanagerserver.cpp
new file mode 100644
index 0000000..7242ac8
--- /dev/null
+++ b/heron/tmanager/src/cpp/manager/tmanagerserver.cpp
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "manager/tmanagerserver.h"
+#include <iostream>
+#include "manager/tmetrics-collector.h"
+#include "manager/tmanager.h"
+#include "processor/processor.h"
+#include "proto/messages.h"
+#include "basics/basics.h"
+#include "errors/errors.h"
+#include "threads/threads.h"
+#include "network/network.h"
+#include "metrics/tmanager-metrics.h"
+
+namespace heron {
+namespace tmanager {
+
+using std::unique_ptr;
+using std::shared_ptr;
+
+TManagerServer::TManagerServer(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
+ shared_ptr<TMetricsCollector> _collector, TManager* _tmanager)
+ : Server(eventLoop, _options), collector_(_collector), tmanager_(_tmanager) {
+ // Install the stmgr handlers
+ InstallRequestHandler(&TManagerServer::HandleStMgrRegisterRequest);
+ InstallRequestHandler(&TManagerServer::HandleStMgrHeartbeatRequest);
+ InstallMessageHandler(&TManagerServer::HandleInstanceStateStored);
+ InstallMessageHandler(&TManagerServer::HandleRestoreTopologyStateResponse);
+ InstallMessageHandler(&TManagerServer::HandleResetTopologyStateMessage);
+
+ // Install the metricsmgr handlers
+ InstallMessageHandler(&TManagerServer::HandleMetricsMgrStats);
+}
+
+TManagerServer::~TManagerServer() {
+ // Nothing really
+}
+
+void TManagerServer::HandleNewConnection(Connection* conn) {
+ // There is nothing to be done here. Instead we wait for
+ // the register message
+}
+
+void TManagerServer::HandleConnectionClose(Connection* _conn, NetworkErrorCode) {
+ if (tmanager_->RemoveStMgrConnection(_conn) != proto::system::OK) {
+ LOG(WARNING) << "Unknown connection closed on us from " << _conn->getIPAddress() << ":"
+ << _conn->getPort() << ", possibly metrics mgr";
+ return;
+ }
+}
+
+void TManagerServer::HandleStMgrRegisterRequest(REQID _reqid, Connection* _conn,
+ pool_unique_ptr<proto::tmanager::StMgrRegisterRequest> _request) {
+ unique_ptr<StMgrRegisterProcessor> processor =
+ make_unique<StMgrRegisterProcessor>(_reqid, _conn, std::move(_request), tmanager_, this);
+ processor->Start();
+}
+
+void TManagerServer::HandleStMgrHeartbeatRequest(REQID _reqid, Connection* _conn,
+ pool_unique_ptr<proto::tmanager::StMgrHeartbeatRequest> _request) {
+ unique_ptr<StMgrHeartbeatProcessor> processor =
+ make_unique<StMgrHeartbeatProcessor>(_reqid, _conn, std::move(_request), tmanager_, this);
+ processor->Start();
+}
+
+void TManagerServer::HandleMetricsMgrStats(Connection*,
+ pool_unique_ptr<proto::tmanager::PublishMetrics> _request) {
+ collector_->AddMetric(*_request);
+}
+
+void TManagerServer::HandleInstanceStateStored(Connection*,
+ pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message) {
+ tmanager_->HandleInstanceStateStored(_message->checkpoint_id(), _message->instance());
+}
+
+void TManagerServer::HandleRestoreTopologyStateResponse(Connection* _conn,
+ pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message) {
+ tmanager_->HandleRestoreTopologyStateResponse(_conn, _message->checkpoint_id(),
+ _message->restore_txid(),
+ _message->status().status());
+}
+
+void TManagerServer::HandleResetTopologyStateMessage(Connection* _conn,
+ pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message) {
+ tmanager_->ResetTopologyState(_conn, _message->dead_stmgr(),
+ _message->dead_taskid(), _message->reason());
+}
+} // namespace tmanager
+} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.h b/heron/tmanager/src/cpp/manager/tmanagerserver.h
similarity index 68%
rename from heron/tmaster/src/cpp/manager/tmasterserver.h
rename to heron/tmanager/src/cpp/manager/tmanagerserver.h
index 0524b61..8859804 100644
--- a/heron/tmaster/src/cpp/manager/tmasterserver.h
+++ b/heron/tmanager/src/cpp/manager/tmanagerserver.h
@@ -17,28 +17,28 @@
* under the License.
*/
-#ifndef __TMASTERSERVER_H
-#define __TMASTERSERVER_H
+#ifndef __TMANAGERSERVER_H
+#define __TMANAGERSERVER_H
#include "network/network_error.h"
#include "network/network.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "proto/ckptmgr.pb.h"
#include "basics/basics.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::shared_ptr;
-class TMaster;
+class TManager;
class TMetricsCollector;
-class TMasterServer : public Server {
+class TManagerServer : public Server {
public:
- TMasterServer(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options,
- shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster);
- virtual ~TMasterServer();
+ TManagerServer(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options,
+ shared_ptr<TMetricsCollector> _collector, TManager* _tmanager);
+ virtual ~TManagerServer();
protected:
virtual void HandleNewConnection(Connection* newConnection);
@@ -47,28 +47,28 @@
private:
// Various handlers for different requests
void HandleStMgrRegisterRequest(REQID _id, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
+ pool_unique_ptr<proto::tmanager::StMgrRegisterRequest> _request);
void HandleStMgrHeartbeatRequest(REQID _id, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request);
- void HandleMetricsMgrStats(Connection*, pool_unique_ptr<proto::tmaster::PublishMetrics> _request);
+ pool_unique_ptr<proto::tmanager::StMgrHeartbeatRequest> _request);
+ void HandleMetricsMgrStats(Connection*, pool_unique_ptr<proto::tmanager::PublishMetrics> _request);
- // Message sent by stmgr to tell tmaster that a particular checkpoint message
- // was saved. This way the tmaster can keep track of which all instances have saved their
+ // Message sent by stmgr to tell tmanager that a particular checkpoint message
+ // was saved. This way the tmanager can keep track of which all instances have saved their
// state for any given checkpoint id.
void HandleInstanceStateStored(Connection*,
pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message);
// Handle response from stmgr for the RestoreTopologyStateRequest
void HandleRestoreTopologyStateResponse(Connection*,
pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message);
- // Stmgr can request tmaster to reset the state of the topology in case it finds any errors.
+ // Stmgr can request tmanager to reset the state of the topology in case it finds any errors.
void HandleResetTopologyStateMessage(Connection*,
pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message);
- // our tmaster
+ // our tmanager
shared_ptr<TMetricsCollector> collector_;
- TMaster* tmaster_;
+ TManager* tmanager_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp b/heron/tmanager/src/cpp/manager/tmetrics-collector.cpp
similarity index 86%
rename from heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
rename to heron/tmanager/src/cpp/manager/tmetrics-collector.cpp
index ede0393..f6b3300 100644
--- a/heron/tmaster/src/cpp/manager/tmetrics-collector.cpp
+++ b/heron/tmanager/src/cpp/manager/tmetrics-collector.cpp
@@ -22,41 +22,41 @@
#include <list>
#include <map>
#include <string>
-#include "metrics/tmaster-metrics.h"
+#include "metrics/tmanager-metrics.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "zookeeper/zkclient.h"
#include "proto/metrics.pb.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "proto/topology.pb.h"
#include "config/heron-internals-config-reader.h"
namespace {
-typedef heron::common::TMasterMetrics TMasterMetrics;
-typedef heron::proto::tmaster::ExceptionLogRequest ExceptionLogRequest;
-typedef heron::proto::tmaster::ExceptionLogResponse ExceptionLogResponse;
-typedef heron::proto::tmaster::MetricRequest MetricRequest;
-typedef heron::proto::tmaster::MetricResponse MetricResponse;
-typedef heron::proto::tmaster::MetricResponse::IndividualMetric IndividualMetric;
-typedef heron::proto::tmaster::MetricResponse::IndividualMetric::IntervalValue IntervalValue;
-typedef heron::proto::tmaster::TmasterExceptionLog TmasterExceptionLog;
-typedef heron::proto::tmaster::PublishMetrics PublishMetrics;
+typedef heron::common::TManagerMetrics TManagerMetrics;
+typedef heron::proto::tmanager::ExceptionLogRequest ExceptionLogRequest;
+typedef heron::proto::tmanager::ExceptionLogResponse ExceptionLogResponse;
+typedef heron::proto::tmanager::MetricRequest MetricRequest;
+typedef heron::proto::tmanager::MetricResponse MetricResponse;
+typedef heron::proto::tmanager::MetricResponse::IndividualMetric IndividualMetric;
+typedef heron::proto::tmanager::MetricResponse::IndividualMetric::IntervalValue IntervalValue;
+typedef heron::proto::tmanager::TmanagerExceptionLog TmanagerExceptionLog;
+typedef heron::proto::tmanager::PublishMetrics PublishMetrics;
} // namespace
namespace heron {
-namespace tmaster {
+namespace tmanager {
TMetricsCollector::TMetricsCollector(sp_int32 _max_interval, std::shared_ptr<EventLoop> eventLoop,
const std::string& metrics_sinks_yaml)
: max_interval_(_max_interval),
eventLoop_(eventLoop),
metrics_sinks_yaml_(metrics_sinks_yaml),
- tmetrics_info_(make_unique<common::TMasterMetrics>(metrics_sinks_yaml, eventLoop)),
+ tmetrics_info_(make_unique<common::TManagerMetrics>(metrics_sinks_yaml, eventLoop)),
start_time_(time(NULL)) {
interval_ = config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterMetricsCollectorPurgeIntervalSec();
+ ->GetHeronTmanagerMetricsCollectorPurgeIntervalSec();
CHECK_EQ(max_interval_ % interval_, 0);
nintervals_ = max_interval_ / interval_;
auto cb = [this](EventLoop::Status status) { this->Purge(status); };
@@ -75,16 +75,16 @@
}
void TMetricsCollector::AddMetricsForComponent(const sp_string& component_name,
- const proto::tmaster::MetricDatum& metrics_data) {
+ const proto::tmanager::MetricDatum& metrics_data) {
auto component_metrics = GetOrCreateComponentMetrics(component_name);
const sp_string& name = metrics_data.name();
- const TMasterMetrics::MetricAggregationType& type = tmetrics_info_->GetAggregationType(name);
+ const TManagerMetrics::MetricAggregationType& type = tmetrics_info_->GetAggregationType(name);
component_metrics->AddMetricForInstance(metrics_data.instance_id(), name, type,
metrics_data.value());
}
void TMetricsCollector::AddExceptionsForComponent(const sp_string& component_name,
- const TmasterExceptionLog& exception_log) {
+ const TmanagerExceptionLog& exception_log) {
auto component_metrics = GetOrCreateComponentMetrics(component_name);
component_metrics->AddExceptionForInstance(exception_log.instance_id(), exception_log);
}
@@ -216,21 +216,21 @@
using std::map;
using std::string;
- map<string, unique_ptr<TmasterExceptionLog>> exception_summary; // Owns exception log pointer.
+ map<string, unique_ptr<TmanagerExceptionLog>> exception_summary; // Owns exception log pointer.
for (int i = 0; i < all_exceptions.exceptions_size(); ++i) {
- const TmasterExceptionLog& log = all_exceptions.exceptions(i);
+ const TmanagerExceptionLog& log = all_exceptions.exceptions(i);
// Get classname by splitting on first colon
const std::string& stack_trace = log.stacktrace();
size_t pos = stack_trace.find_first_of(':');
if (pos != std::string::npos) {
const std::string class_name = stack_trace.substr(0, pos);
if (exception_summary.find(class_name) == exception_summary.end()) {
- auto new_exception = make_unique<TmasterExceptionLog>();
+ auto new_exception = make_unique<TmanagerExceptionLog>();
new_exception->CopyFrom(log);
new_exception->set_stacktrace(class_name);
exception_summary[class_name] = std::move(new_exception);
} else {
- TmasterExceptionLog& prev_log = *exception_summary[class_name];
+ TmanagerExceptionLog& prev_log = *exception_summary[class_name];
prev_log.set_count(log.count() + prev_log.count());
prev_log.set_lasttime(log.lasttime());
}
@@ -266,14 +266,14 @@
}
void TMetricsCollector::ComponentMetrics::AddMetricForInstance(
- const sp_string& instance_id, const sp_string& name, TMasterMetrics::MetricAggregationType type,
+ const sp_string& instance_id, const sp_string& name, TManagerMetrics::MetricAggregationType type,
const sp_string& value) {
auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
instance_metrics->AddMetricWithName(name, type, value);
}
void TMetricsCollector::ComponentMetrics::AddExceptionForInstance(
- const sp_string& instance_id, const TmasterExceptionLog& exception) {
+ const sp_string& instance_id, const TmanagerExceptionLog& exception) {
auto instance_metrics = GetOrCreateInstanceMetrics(instance_id);
instance_metrics->AddExceptions(exception);
}
@@ -343,29 +343,29 @@
}
void TMetricsCollector::InstanceMetrics::AddMetricWithName(
- const sp_string& name, common::TMasterMetrics::MetricAggregationType type,
+ const sp_string& name, common::TManagerMetrics::MetricAggregationType type,
const sp_string& value) {
auto metric_data = GetOrCreateMetric(name, type);
metric_data->AddValueToMetric(value);
}
// Creates a copy of exception and takes ownership of the pointer.
-void TMetricsCollector::InstanceMetrics::AddExceptions(const TmasterExceptionLog& exception) {
+void TMetricsCollector::InstanceMetrics::AddExceptions(const TmanagerExceptionLog& exception) {
// TODO(kramasamy): Aggregate exceptions across minutely buckets. Try to avoid duplication of
// hash-fuction
// used to aggregate in heron-worker.
- auto new_exception = make_unique<TmasterExceptionLog>();
+ auto new_exception = make_unique<TmanagerExceptionLog>();
new_exception->CopyFrom(exception);
exceptions_.push_back(std::move(new_exception));
sp_uint32 max_exception = config::HeronInternalsConfigReader::Instance()
- ->GetHeronTmasterMetricsCollectorMaximumException();
+ ->GetHeronTmanagerMetricsCollectorMaximumException();
while (exceptions_.size() > max_exception) {
exceptions_.pop_front();
}
}
shared_ptr<TMetricsCollector::Metric> TMetricsCollector::InstanceMetrics::GetOrCreateMetric(
- const sp_string& name, TMasterMetrics::MetricAggregationType type) {
+ const sp_string& name, TManagerMetrics::MetricAggregationType type) {
if (metrics_.find(name) == metrics_.end()) {
metrics_[name] = std::make_shared<Metric>(name, type, nbuckets_, bucket_interval_);
}
@@ -392,7 +392,7 @@
}
TMetricsCollector::Metric::Metric(const sp_string& name,
- common::TMasterMetrics::MetricAggregationType type,
+ common::TManagerMetrics::MetricAggregationType type,
sp_int32 nbuckets, sp_int32 bucket_interval)
: name_(name),
metric_type_(type),
@@ -412,7 +412,7 @@
}
void TMetricsCollector::Metric::AddValueToMetric(const sp_string& _value) {
- if (metric_type_ == common::TMasterMetrics::LAST) {
+ if (metric_type_ == common::TManagerMetrics::LAST) {
// Just keep one value per time bucket
data_.front()->data_.clear();
data_.front()->data_.push_front(_value);
@@ -439,12 +439,12 @@
val->mutable_interval()->set_start(bucket.start_time_);
val->mutable_interval()->set_end(bucket.end_time_);
sp_double64 result = bucket.aggregate();
- if (metric_type_ == common::TMasterMetrics::SUM) {
+ if (metric_type_ == common::TManagerMetrics::SUM) {
val->set_value(std::to_string(result));
- } else if (metric_type_ == common::TMasterMetrics::AVG) {
+ } else if (metric_type_ == common::TManagerMetrics::AVG) {
sp_double64 avg = result / bucket.count();
val->set_value(std::to_string(avg));
- } else if (metric_type_ == common::TMasterMetrics::LAST) {
+ } else if (metric_type_ == common::TManagerMetrics::LAST) {
val->set_value(std::to_string(result));
} else {
LOG(FATAL) << "Unknown metric type " << metric_type_;
@@ -458,11 +458,11 @@
sp_double64 result = 0;
if (start_time <= 0) {
// We want cumulative metrics
- if (metric_type_ == common::TMasterMetrics::SUM) {
+ if (metric_type_ == common::TManagerMetrics::SUM) {
result = all_time_cumulative_;
- } else if (metric_type_ == common::TMasterMetrics::AVG) {
+ } else if (metric_type_ == common::TManagerMetrics::AVG) {
result = all_time_cumulative_ / all_time_nitems_;
- } else if (metric_type_ == common::TMasterMetrics::LAST) {
+ } else if (metric_type_ == common::TManagerMetrics::LAST) {
result = all_time_cumulative_;
} else {
LOG(FATAL) << "Uknown metric type " << metric_type_;
@@ -477,16 +477,16 @@
if (bucket.overlaps(start_time, end_time)) {
total_count += bucket.aggregate();
total_items += bucket.count();
- if (metric_type_ == TMasterMetrics::LAST) break;
+ if (metric_type_ == TManagerMetrics::LAST) break;
}
// The timebuckets are reverse chronologically arranged
if (start_time > bucket.end_time_) break;
}
- if (metric_type_ == TMasterMetrics::SUM) {
+ if (metric_type_ == TManagerMetrics::SUM) {
result = total_count;
- } else if (metric_type_ == TMasterMetrics::AVG) {
+ } else if (metric_type_ == TManagerMetrics::AVG) {
result = total_count / total_items;
- } else if (metric_type_ == TMasterMetrics::LAST) {
+ } else if (metric_type_ == TManagerMetrics::LAST) {
result = total_count;
} else {
LOG(FATAL) << "Uknown metric type " << metric_type_;
@@ -495,5 +495,5 @@
_response->set_value(std::to_string(result));
}
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/tmetrics-collector.h b/heron/tmanager/src/cpp/manager/tmetrics-collector.h
similarity index 76%
rename from heron/tmaster/src/cpp/manager/tmetrics-collector.h
rename to heron/tmanager/src/cpp/manager/tmetrics-collector.h
index 192b2a0..5aa1078 100644
--- a/heron/tmaster/src/cpp/manager/tmetrics-collector.h
+++ b/heron/tmanager/src/cpp/manager/tmetrics-collector.h
@@ -26,12 +26,12 @@
#include "basics/callback.h"
#include "basics/sptypes.h"
#include "network/event_loop.h"
-#include "proto/tmaster.pb.h"
+#include "proto/tmanager.pb.h"
#include "proto/topology.pb.h"
-#include "metrics/tmaster-metrics.h"
+#include "metrics/tmanager-metrics.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
using std::unique_ptr;
using std::shared_ptr;
@@ -50,40 +50,40 @@
// Initiated on recieving a new _metrics from metricsmanager. Will initiate appropriate calls
// to add metrics/exception stored in '_metrics' to the respective components.
- void AddMetric(const proto::tmaster::PublishMetrics& _metrics);
+ void AddMetric(const proto::tmanager::PublishMetrics& _metrics);
// Returns a new response to fetch metrics. The request gets propagated to Component's and
// Instance's get metrics. Doesn't own Response.
- unique_ptr<proto::tmaster::MetricResponse> GetMetrics(
- const proto::tmaster::MetricRequest& _request,
+ unique_ptr<proto::tmanager::MetricResponse> GetMetrics(
+ const proto::tmanager::MetricRequest& _request,
const proto::api::Topology& _topology);
// Returns response for fetching exceptions. Doesn't own response.
- unique_ptr<proto::tmaster::ExceptionLogResponse> GetExceptions(
- const proto::tmaster::ExceptionLogRequest& request);
+ unique_ptr<proto::tmanager::ExceptionLogResponse> GetExceptions(
+ const proto::tmanager::ExceptionLogRequest& request);
// Returns exception summary response. Doesn't own response.
- unique_ptr<proto::tmaster::ExceptionLogResponse> GetExceptionsSummary(
- const proto::tmaster::ExceptionLogRequest& request);
+ unique_ptr<proto::tmanager::ExceptionLogResponse> GetExceptionsSummary(
+ const proto::tmanager::ExceptionLogRequest& request);
private:
// Fetches exceptions for ExceptionLogRequest. Save the returned exception in
// 'all_exceptions'.
// Doesn't own 'all_exceptions' pointer
- void GetExceptionsHelper(const proto::tmaster::ExceptionLogRequest& request,
- proto::tmaster::ExceptionLogResponse& all_excepions);
+ void GetExceptionsHelper(const proto::tmanager::ExceptionLogRequest& request,
+ proto::tmanager::ExceptionLogResponse& all_excepions);
// Aggregate exceptions from 'all_exceptions' to 'aggregate_exceptions'.
// Doesn't own 'aggregate_exceptions'.
- void AggregateExceptions(const proto::tmaster::ExceptionLogResponse& all_exceptions,
- proto::tmaster::ExceptionLogResponse& aggregate_exceptions);
+ void AggregateExceptions(const proto::tmanager::ExceptionLogResponse& all_exceptions,
+ proto::tmanager::ExceptionLogResponse& aggregate_exceptions);
// Add metrics for 'component_name'
void AddMetricsForComponent(const sp_string& component_name,
- const proto::tmaster::MetricDatum& metrics_data);
+ const proto::tmanager::MetricDatum& metrics_data);
// Add exception logs for 'component_name'
void AddExceptionsForComponent(const sp_string& component_name,
- const proto::tmaster::TmasterExceptionLog& exception_log);
+ const proto::tmanager::TmanagerExceptionLog& exception_log);
// Clean all metrics.
void Purge(EventLoop::Status _status);
@@ -126,7 +126,7 @@
class Metric {
public:
// TODO(kramasamy): Add ctor for default UNKNOWN type and give a set type function.
- Metric(const sp_string& name, common::TMasterMetrics::MetricAggregationType type,
+ Metric(const sp_string& name, common::TManagerMetrics::MetricAggregationType type,
sp_int32 nbuckets, sp_int32 bucket_interval);
// Deletes all TimeBucket.
@@ -139,7 +139,7 @@
// Return past '_nbuckets' value for this metric.
void GetMetrics(bool minutely, sp_int64 start_time, sp_int64 end_time,
- proto::tmaster::MetricResponse::IndividualMetric* response);
+ proto::tmanager::MetricResponse::IndividualMetric* response);
private:
sp_string name_;
@@ -147,7 +147,7 @@
std::list<unique_ptr<TimeBucket>> data_;
// Type of metric. This can be SUM or AVG. It specify how to aggregate these metrics for
// display.
- common::TMasterMetrics::MetricAggregationType metric_type_;
+ common::TManagerMetrics::MetricAggregationType metric_type_;
sp_double64 all_time_cumulative_;
@@ -171,23 +171,23 @@
// Add metrics with name '_name' of type '_type' and value _value.
void AddMetricWithName(const sp_string& name,
- common::TMasterMetrics::MetricAggregationType type,
+ common::TManagerMetrics::MetricAggregationType type,
const sp_string& value);
- // Add TmasterExceptionLog to the list of exceptions for this instance_id.
- void AddExceptions(const proto::tmaster::TmasterExceptionLog& exception);
+ // Add TmanagerExceptionLog to the list of exceptions for this instance_id.
+ void AddExceptions(const proto::tmanager::TmanagerExceptionLog& exception);
// Returns the metric metrics. Doesn't own _response.
- void GetMetrics(const proto::tmaster::MetricRequest& request, sp_int64 start_time,
- sp_int64 end_time, proto::tmaster::MetricResponse& response);
+ void GetMetrics(const proto::tmanager::MetricRequest& request, sp_int64 start_time,
+ sp_int64 end_time, proto::tmanager::MetricResponse& response);
// Fills response for fetching exceptions. Doesn't own response.
- void GetExceptionLog(proto::tmaster::ExceptionLogResponse& response);
+ void GetExceptionLog(proto::tmanager::ExceptionLogResponse& response);
private:
// Create or return existing Metric. Retains ownership of Metric object returned.
shared_ptr<Metric> GetOrCreateMetric(const sp_string& name,
- common::TMasterMetrics::MetricAggregationType type);
+ common::TManagerMetrics::MetricAggregationType type);
sp_string instance_id_;
sp_int32 nbuckets_;
@@ -195,7 +195,7 @@
// map between metric name and its values
std::map<sp_string, shared_ptr<Metric>> metrics_;
// list of exceptions
- std::list<unique_ptr<proto::tmaster::TmasterExceptionLog>> exceptions_;
+ std::list<unique_ptr<proto::tmanager::TmanagerExceptionLog>> exceptions_;
};
// Component level metrics. A component metrics is a map storing metrics for each of its
@@ -213,22 +213,22 @@
// Add metrics for an Instance 'instance_id' of this spout/bolt component.
void AddMetricForInstance(const sp_string& instance_id, const sp_string& name,
- common::TMasterMetrics::MetricAggregationType type,
+ common::TManagerMetrics::MetricAggregationType type,
const sp_string& value);
// Add exception for an Instance 'instance_id' of this spout/bolt component.
void AddExceptionForInstance(const sp_string& instance_id,
- const proto::tmaster::TmasterExceptionLog& exception);
+ const proto::tmanager::TmanagerExceptionLog& exception);
// Request aggregated metrics for this component for the '_nbucket' interval.
// Doesn't own '_response' object.
- void GetMetrics(const proto::tmaster::MetricRequest& request, sp_int64 start_time,
- sp_int64 end_time, proto::tmaster::MetricResponse& response);
+ void GetMetrics(const proto::tmanager::MetricRequest& request, sp_int64 start_time,
+ sp_int64 end_time, proto::tmanager::MetricResponse& response);
// Returns response for fetching exceptions. Doesn't own response.
void GetExceptionsForInstance(const sp_string& instance_id,
- proto::tmaster::ExceptionLogResponse& response);
+ proto::tmanager::ExceptionLogResponse& response);
- void GetAllExceptions(proto::tmaster::ExceptionLogResponse& response);
+ void GetAllExceptions(proto::tmanager::ExceptionLogResponse& response);
private:
// Create or return existing mutable InstanceMetrics associated with 'instance_id'. This
@@ -254,10 +254,10 @@
sp_int32 interval_;
std::shared_ptr<EventLoop> eventLoop_;
std::string metrics_sinks_yaml_;
- std::unique_ptr<common::TMasterMetrics> tmetrics_info_;
+ std::unique_ptr<common::TManagerMetrics> tmetrics_info_;
time_t start_time_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/processor/processor.h b/heron/tmanager/src/cpp/processor/processor.h
similarity index 88%
rename from heron/tmaster/src/cpp/processor/processor.h
rename to heron/tmanager/src/cpp/processor/processor.h
index e573dd3..119d887 100644
--- a/heron/tmaster/src/cpp/processor/processor.h
+++ b/heron/tmanager/src/cpp/processor/processor.h
@@ -17,10 +17,10 @@
* under the License.
*/
-#if !defined(__TMASTER_PROCESSORS_H_)
-#define __TMASTER_PROCESSORS_H_
+#if !defined(__TMANAGER_PROCESSORS_H_)
+#define __TMANAGER_PROCESSORS_H_
-#include "processor/tmaster-processor.h"
+#include "processor/tmanager-processor.h"
#include "processor/stmgr-register-processor.h"
#include "processor/stmgr-heartbeat-processor.h"
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp b/heron/tmanager/src/cpp/processor/stmgr-heartbeat-processor.cpp
similarity index 69%
rename from heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
rename to heron/tmanager/src/cpp/processor/stmgr-heartbeat-processor.cpp
index 1c98d70..d4b33df 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.cpp
+++ b/heron/tmanager/src/cpp/processor/stmgr-heartbeat-processor.cpp
@@ -19,8 +19,8 @@
#include "processor/stmgr-heartbeat-processor.h"
#include <iostream>
-#include "processor/tmaster-processor.h"
-#include "manager/tmaster.h"
+#include "processor/tmanager-processor.h"
+#include "manager/tmanager.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
@@ -28,27 +28,27 @@
#include "network/network.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
StMgrHeartbeatProcessor::StMgrHeartbeatProcessor(REQID reqid, Connection* conn,
- pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> request,
- TMaster* tmaster, Server* server)
- : Processor(reqid, conn, std::move(request), tmaster, server) {}
+ pool_unique_ptr<proto::tmanager::StMgrHeartbeatRequest> request,
+ TManager* tmanager, Server* server)
+ : Processor(reqid, conn, std::move(request), tmanager, server) {}
StMgrHeartbeatProcessor::~StMgrHeartbeatProcessor() {
// nothing to be done here
}
void StMgrHeartbeatProcessor::Start() {
- proto::tmaster::StMgrHeartbeatRequest* request =
- static_cast<proto::tmaster::StMgrHeartbeatRequest*>(request_.get());
+ proto::tmanager::StMgrHeartbeatRequest* request =
+ static_cast<proto::tmanager::StMgrHeartbeatRequest*>(request_.get());
- proto::system::Status* status = tmaster_->UpdateStMgrHeartbeat(
+ proto::system::Status* status = tmanager_->UpdateStMgrHeartbeat(
GetConnection(), request->heartbeat_time(), request->release_stats());
- proto::tmaster::StMgrHeartbeatResponse response;
+ proto::tmanager::StMgrHeartbeatResponse response;
response.set_allocated_status(status);
SendResponse(response);
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h b/heron/tmanager/src/cpp/processor/stmgr-heartbeat-processor.h
similarity index 84%
rename from heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
rename to heron/tmanager/src/cpp/processor/stmgr-heartbeat-processor.h
index 6240bda..9a2b9ed 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-heartbeat-processor.h
+++ b/heron/tmanager/src/cpp/processor/stmgr-heartbeat-processor.h
@@ -20,24 +20,24 @@
#ifndef STMGR_HEARTBEAT_PROCESSOR_
#define STMGR_HEARTBEAT_PROCESSOR_
-#include "processor/tmaster-processor.h"
+#include "processor/tmanager-processor.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "network/network.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
class StMgrHeartbeatProcessor : public Processor {
public:
StMgrHeartbeatProcessor(REQID _reqid, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request,
- TMaster* _tmaster,
+ pool_unique_ptr<proto::tmanager::StMgrHeartbeatRequest> _request,
+ TManager* _tmanager,
Server* _server);
virtual ~StMgrHeartbeatProcessor();
void Start();
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp b/heron/tmanager/src/cpp/processor/stmgr-register-processor.cpp
similarity index 73%
rename from heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
rename to heron/tmanager/src/cpp/processor/stmgr-register-processor.cpp
index c58dda8..17889ec 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.cpp
+++ b/heron/tmanager/src/cpp/processor/stmgr-register-processor.cpp
@@ -20,8 +20,8 @@
#include "processor/stmgr-register-processor.h"
#include <iostream>
#include <vector>
-#include "processor/tmaster-processor.h"
-#include "manager/tmaster.h"
+#include "processor/tmanager-processor.h"
+#include "manager/tmanager.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
@@ -29,12 +29,12 @@
#include "network/network.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
StMgrRegisterProcessor::StMgrRegisterProcessor(REQID _reqid, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
- TMaster* _tmaster, Server* _server)
- : Processor(_reqid, _conn, std::move(_request), _tmaster, _server) {}
+ pool_unique_ptr<proto::tmanager::StMgrRegisterRequest> _request,
+ TManager* _tmanager, Server* _server)
+ : Processor(_reqid, _conn, std::move(_request), _tmanager, _server) {}
StMgrRegisterProcessor::~StMgrRegisterProcessor() {
// nothing to be done here
@@ -42,9 +42,9 @@
void StMgrRegisterProcessor::Start() {
// We got a new stream manager registering to us
- // Get the relevant info and ask tmaster to register
- proto::tmaster::StMgrRegisterRequest* request =
- static_cast<proto::tmaster::StMgrRegisterRequest*>(request_.get());
+ // Get the relevant info and ask tmanager to register
+ proto::tmanager::StMgrRegisterRequest* request =
+ static_cast<proto::tmanager::StMgrRegisterRequest*>(request_.get());
std::vector<shared_ptr<proto::system::Instance>> instances;
for (sp_int32 i = 0; i < request->instances_size(); ++i) {
auto instance = std::make_shared<proto::system::Instance>();
@@ -55,10 +55,10 @@
shared_ptr<proto::system::PhysicalPlan> pplan;
proto::system::Status* status =
- tmaster_->RegisterStMgr(request->stmgr(), instances, GetConnection(), pplan);
+ tmanager_->RegisterStMgr(request->stmgr(), instances, GetConnection(), pplan);
// Send the response
- proto::tmaster::StMgrRegisterResponse response;
+ proto::tmanager::StMgrRegisterResponse response;
response.set_allocated_status(status);
if (status->status() == proto::system::OK) {
if (pplan) {
@@ -68,5 +68,5 @@
SendResponse(response);
return;
}
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/processor/stmgr-register-processor.h b/heron/tmanager/src/cpp/processor/stmgr-register-processor.h
similarity index 84%
rename from heron/tmaster/src/cpp/processor/stmgr-register-processor.h
rename to heron/tmanager/src/cpp/processor/stmgr-register-processor.h
index 208c09f..def163c 100644
--- a/heron/tmaster/src/cpp/processor/stmgr-register-processor.h
+++ b/heron/tmanager/src/cpp/processor/stmgr-register-processor.h
@@ -20,24 +20,24 @@
#ifndef STMGR_REGISTER_PROCESSOR_H_
#define STMGR_REGISTER_PROCESSOR_H_
-#include "processor/tmaster-processor.h"
+#include "processor/tmanager-processor.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "network/network.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
class StMgrRegisterProcessor : public Processor {
public:
StMgrRegisterProcessor(REQID _reqid, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request,
- TMaster* _tmaster,
+ pool_unique_ptr<proto::tmanager::StMgrRegisterRequest> _request,
+ TManager* _tmanager,
Server* _server);
virtual ~StMgrRegisterProcessor();
void Start();
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/processor/tmaster-processor.cpp b/heron/tmanager/src/cpp/processor/tmanager-processor.cpp
similarity index 85%
rename from heron/tmaster/src/cpp/processor/tmaster-processor.cpp
rename to heron/tmanager/src/cpp/processor/tmanager-processor.cpp
index 0caaa8b..7101d61 100644
--- a/heron/tmaster/src/cpp/processor/tmaster-processor.cpp
+++ b/heron/tmanager/src/cpp/processor/tmanager-processor.cpp
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "processor/tmaster-processor.h"
+#include "processor/tmanager-processor.h"
#include <iostream>
#include "proto/messages.h"
#include "basics/basics.h"
@@ -26,12 +26,12 @@
#include "network/network.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
Processor::Processor(REQID _reqid, Connection* _conn,
pool_unique_ptr<google::protobuf::Message> _request,
- TMaster* _tmaster, Server* _server)
- : request_(std::move(_request)), tmaster_(_tmaster), server_(_server),
+ TManager* _tmanager, Server* _server)
+ : request_(std::move(_request)), tmanager_(_tmanager), server_(_server),
reqid_(_reqid), conn_(_conn) {}
Processor::~Processor() {}
@@ -41,5 +41,5 @@
}
void Processor::CloseConnection() { server_->CloseConnection(conn_); }
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
diff --git a/heron/tmaster/src/cpp/processor/tmaster-processor.h b/heron/tmanager/src/cpp/processor/tmanager-processor.h
similarity index 88%
rename from heron/tmaster/src/cpp/processor/tmaster-processor.h
rename to heron/tmanager/src/cpp/processor/tmanager-processor.h
index 70c7c20..46be7ad 100644
--- a/heron/tmaster/src/cpp/processor/tmaster-processor.h
+++ b/heron/tmanager/src/cpp/processor/tmanager-processor.h
@@ -17,22 +17,22 @@
* under the License.
*/
-#ifndef TMASTER_PROCESSOR_H_
-#define TMASTER_PROCESSOR_H_
+#ifndef TMANAGER_PROCESSOR_H_
+#define TMANAGER_PROCESSOR_H_
#include "proto/messages.h"
#include "basics/basics.h"
#include "network/network.h"
namespace heron {
-namespace tmaster {
+namespace tmanager {
-class TMaster;
+class TManager;
class Processor {
public:
Processor(REQID _reqid, Connection* _conn, pool_unique_ptr<google::protobuf::Message> _request,
- TMaster* _tmaster,
+ TManager* _tmanager,
Server* _server);
virtual ~Processor();
virtual void Start() = 0;
@@ -42,13 +42,13 @@
Connection* GetConnection() { return conn_; }
void CloseConnection();
pool_unique_ptr<google::protobuf::Message> request_;
- TMaster* tmaster_;
+ TManager* tmanager_;
Server* server_;
private:
REQID reqid_;
Connection* conn_;
};
-} // namespace tmaster
+} // namespace tmanager
} // namespace heron
#endif
diff --git a/heron/tmaster/src/cpp/server/tmaster-main.cpp b/heron/tmanager/src/cpp/server/tmanager-main.cpp
similarity index 89%
rename from heron/tmaster/src/cpp/server/tmaster-main.cpp
rename to heron/tmanager/src/cpp/server/tmanager-main.cpp
index 7c4343c..dd5a055 100644
--- a/heron/tmaster/src/cpp/server/tmaster-main.cpp
+++ b/heron/tmanager/src/cpp/server/tmanager-main.cpp
@@ -21,7 +21,7 @@
#include <string>
#include <vector>
#include "gflags/gflags.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
@@ -34,7 +34,7 @@
DEFINE_string(zkhostportlist, "", "Location of the zk");
DEFINE_string(zkroot, "", "Root of the zk");
DEFINE_string(myhost, "", "The hostname that I'm running");
-DEFINE_int32(master_port, 0, "The port used for communication with stmgrs");
+DEFINE_int32(server_port, 0, "The port used for communication with stmgrs");
DEFINE_int32(controller_port, 0, "The port used to activate/deactivate");
DEFINE_int32(stats_port, 0, "The port of the getting stats");
DEFINE_string(config_file, "", "The heron internals config file");
@@ -60,12 +60,12 @@
heron::common::Initialize(argv[0], FLAGS_topology_id.c_str());
- LOG(INFO) << "Starting tmaster for topology " << FLAGS_topology_name << " with topology id "
+ LOG(INFO) << "Starting tmanager for topology " << FLAGS_topology_name << " with topology id "
<< FLAGS_topology_id << " zkhostport " << FLAGS_zkhostportlist
<< " and zkroot " << FLAGS_zkroot;
- heron::tmaster::TMaster tmaster(FLAGS_zkhostportlist, FLAGS_topology_name, FLAGS_topology_id,
- FLAGS_zkroot, FLAGS_controller_port, FLAGS_master_port,
+ heron::tmanager::TManager tmanager(FLAGS_zkhostportlist, FLAGS_topology_name, FLAGS_topology_id,
+ FLAGS_zkroot, FLAGS_controller_port, FLAGS_server_port,
FLAGS_stats_port, FLAGS_metricsmgr_port,
FLAGS_ckptmgr_port, FLAGS_metrics_sinks_yaml, FLAGS_myhost, ss);
ss->loop();
diff --git a/heron/tmaster/tests/cpp/server/BUILD b/heron/tmanager/tests/cpp/server/BUILD
similarity index 77%
rename from heron/tmaster/tests/cpp/server/BUILD
rename to heron/tmanager/tests/cpp/server/BUILD
index 341fc6c..53d3483 100644
--- a/heron/tmaster/tests/cpp/server/BUILD
+++ b/heron/tmanager/tests/cpp/server/BUILD
@@ -3,12 +3,12 @@
package(default_visibility = ["//visibility:public"])
cc_test(
- name = "tmaster_unittest",
+ name = "tmanager_unittest",
size = "small",
srcs = [
"dummystmgr.cpp",
"dummystmgr.h",
- "tmaster_unittest.cpp",
+ "tmanager_unittest.cpp",
],
args = ["$(location //heron/config/src/yaml:test-config-internals-yaml)"],
copts = [
@@ -16,8 +16,8 @@
"-Iheron/common/src/cpp",
"-Iheron/statemgrs/src/cpp",
"-Iheron/stmgr/src/cpp",
- "-Iheron/tmaster/src/cpp",
- "-Iheron/tmaster/tests/cpp",
+ "-Iheron/tmanager/src/cpp",
+ "-Iheron/tmanager/tests/cpp",
"-I$(GENDIR)/heron",
"-I$(GENDIR)/heron/common/src/cpp",
],
@@ -28,7 +28,7 @@
linkstatic = 1,
deps = [
"//heron/stmgr/src/cpp:manager-cxx",
- "//heron/tmaster/src/cpp:tmaster-cxx",
+ "//heron/tmanager/src/cpp:tmanager-cxx",
"@com_google_googletest//:gtest",
],
)
@@ -44,8 +44,8 @@
"-Iheron/common/src/cpp",
"-Iheron/statemgrs/src/cpp",
"-Iheron/stmgr/src/cpp",
- "-Iheron/tmaster/src/cpp",
- "-Iheron/tmaster/tests/cpp",
+ "-Iheron/tmanager/src/cpp",
+ "-Iheron/tmanager/tests/cpp",
"-I$(GENDIR)/heron",
"-I$(GENDIR)/heron/common/src/cpp",
],
@@ -53,7 +53,7 @@
linkstatic = 1,
deps = [
"//heron/stmgr/src/cpp:manager-cxx",
- "//heron/tmaster/src/cpp:tmaster-cxx",
+ "//heron/tmanager/src/cpp:tmanager-cxx",
"@com_google_googletest//:gtest",
],
)
@@ -64,23 +64,23 @@
srcs = [
"dummystmgr.cpp",
"dummystmgr.h",
- "dummytmaster.cpp",
- "dummytmaster.h",
+ "dummytmanager.cpp",
+ "dummytmanager.h",
"stateful_restorer_unittest.cpp",
],
copts = [
"-Iheron",
"-Iheron/common/src/cpp",
"-Iheron/statemgrs/src/cpp",
- "-Iheron/tmaster/src/cpp",
- "-Iheron/tmaster/tests/cpp",
+ "-Iheron/tmanager/src/cpp",
+ "-Iheron/tmanager/tests/cpp",
"-I$(GENDIR)/heron",
"-I$(GENDIR)/heron/common/src/cpp",
],
flaky = 1,
linkstatic = 1,
deps = [
- "//heron/tmaster/src/cpp:tmaster-cxx",
+ "//heron/tmanager/src/cpp:tmanager-cxx",
"@com_google_googletest//:gtest",
],
)
@@ -95,15 +95,15 @@
"-Iheron",
"-Iheron/common/src/cpp",
"-Iheron/statemgrs/src/cpp",
- "-Iheron/tmaster/src/cpp",
- "-Iheron/tmaster/tests/cpp",
+ "-Iheron/tmanager/src/cpp",
+ "-Iheron/tmanager/tests/cpp",
"-I$(GENDIR)/heron",
"-I$(GENDIR)/heron/common/src/cpp",
],
flaky = 0,
linkstatic = 1,
deps = [
- "//heron/tmaster/src/cpp:tmaster-cxx",
+ "//heron/tmanager/src/cpp:tmanager-cxx",
"@com_google_googletest//:gtest",
],
)
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.cpp b/heron/tmanager/tests/cpp/server/dummystmgr.cpp
similarity index 91%
rename from heron/tmaster/tests/cpp/server/dummystmgr.cpp
rename to heron/tmanager/tests/cpp/server/dummystmgr.cpp
index 13cf22e..47bb1b1 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.cpp
+++ b/heron/tmanager/tests/cpp/server/dummystmgr.cpp
@@ -42,9 +42,9 @@
pplan_(nullptr),
got_restore_message_(false),
got_start_message_(false) {
- InstallResponseHandler(std::move(make_unique<proto::tmaster::StMgrRegisterRequest>()),
+ InstallResponseHandler(std::move(make_unique<proto::tmanager::StMgrRegisterRequest>()),
&DummyStMgr::HandleRegisterResponse);
- InstallResponseHandler(std::move(make_unique<proto::tmaster::StMgrHeartbeatRequest>()),
+ InstallResponseHandler(std::move(make_unique<proto::tmanager::StMgrHeartbeatRequest>()),
&DummyStMgr::HandleHeartbeatResponse);
InstallMessageHandler(&DummyStMgr::HandleNewAssignmentMessage);
InstallMessageHandler(&DummyStMgr::HandleRestoreTopologyStateRequest);
@@ -77,7 +77,7 @@
void DummyStMgr::HandleRegisterResponse(
void*,
- pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
+ pool_unique_ptr<proto::tmanager::StMgrRegisterResponse> response,
NetworkErrorCode status) {
if (status != OK) {
LOG(ERROR) << "NonOK response message for Register Response";
@@ -107,7 +107,7 @@
void DummyStMgr::HandleHeartbeatResponse(
void*,
- pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+ pool_unique_ptr<proto::tmanager::StMgrHeartbeatResponse> response,
NetworkErrorCode status) {
if (status != OK) {
LOG(ERROR) << "NonOK response message for Register Response";
@@ -144,7 +144,7 @@
}
void DummyStMgr::SendRegisterRequest() {
- auto request = make_unique<proto::tmaster::StMgrRegisterRequest>();
+ auto request = make_unique<proto::tmanager::StMgrRegisterRequest>();
proto::system::StMgr* stmgr = request->mutable_stmgr();
stmgr->set_id(my_id_);
stmgr->set_host_name(my_host_);
@@ -159,7 +159,7 @@
}
void DummyStMgr::SendHeartbeatRequest() {
- auto request = make_unique<proto::tmaster::StMgrHeartbeatRequest>();
+ auto request = make_unique<proto::tmanager::StMgrHeartbeatRequest>();
request->set_heartbeat_time(time(NULL));
request->mutable_stats();
SendRequest(std::move(request), nullptr);
diff --git a/heron/tmaster/tests/cpp/server/dummystmgr.h b/heron/tmanager/tests/cpp/server/dummystmgr.h
similarity index 93%
rename from heron/tmaster/tests/cpp/server/dummystmgr.h
rename to heron/tmanager/tests/cpp/server/dummystmgr.h
index 06df950..e71f301 100644
--- a/heron/tmaster/tests/cpp/server/dummystmgr.h
+++ b/heron/tmanager/tests/cpp/server/dummystmgr.h
@@ -53,10 +53,10 @@
private:
void HandleRegisterResponse(void*,
- pool_unique_ptr<proto::tmaster::StMgrRegisterResponse> response,
+ pool_unique_ptr<proto::tmanager::StMgrRegisterResponse> response,
NetworkErrorCode);
void HandleHeartbeatResponse(void*,
- pool_unique_ptr<proto::tmaster::StMgrHeartbeatResponse> response,
+ pool_unique_ptr<proto::tmanager::StMgrHeartbeatResponse> response,
NetworkErrorCode);
void HandleNewAssignmentMessage(pool_unique_ptr<proto::stmgr::NewPhysicalPlanMessage> message);
void HandleNewPhysicalPlan(const proto::system::PhysicalPlan& pplan);
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.cpp b/heron/tmanager/tests/cpp/server/dummytmanager.cpp
similarity index 62%
rename from heron/tmaster/tests/cpp/server/dummytmaster.cpp
rename to heron/tmanager/tests/cpp/server/dummytmanager.cpp
index a554998..9cf7976 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.cpp
+++ b/heron/tmanager/tests/cpp/server/dummytmanager.cpp
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "server/dummytmaster.h"
+#include "server/dummytmanager.h"
#include <stdio.h>
#include <iostream>
#include <string>
@@ -32,35 +32,35 @@
namespace heron {
namespace testing {
-DummyTMaster::DummyTMaster(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options)
+DummyTManager::DummyTManager(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options)
: Server(eventLoop, options) {
- InstallRequestHandler(&DummyTMaster::HandleRegisterRequest);
+ InstallRequestHandler(&DummyTManager::HandleRegisterRequest);
}
-DummyTMaster::~DummyTMaster() {}
+DummyTManager::~DummyTManager() {}
-void DummyTMaster::HandleNewConnection(Connection* _conn) {
+void DummyTManager::HandleNewConnection(Connection* _conn) {
// Do nothing
}
-void DummyTMaster::HandleConnectionClose(Connection* _conn, NetworkErrorCode) {
+void DummyTManager::HandleConnectionClose(Connection* _conn, NetworkErrorCode) {
// Do Nothing
}
-void DummyTMaster::HandleRegisterRequest(REQID _id, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
+void DummyTManager::HandleRegisterRequest(REQID _id, Connection* _conn,
+ pool_unique_ptr<proto::tmanager::StMgrRegisterRequest> _request) {
std::vector<std::shared_ptr<proto::system::Instance>> instances;
stmgrs_[_request->stmgr().id()] =
- std::make_shared<tmaster::StMgrState>(_conn, _request->stmgr(), instances, *this);
- proto::tmaster::StMgrRegisterResponse response;
+ std::make_shared<tmanager::StMgrState>(_conn, _request->stmgr(), instances, *this);
+ proto::tmanager::StMgrRegisterResponse response;
response.mutable_status()->set_status(proto::system::OK);
SendResponse(_id, _conn, response);
}
-void DummyTMaster::HandleHeartbeatRequest(REQID _id, Connection* _conn,
- proto::tmaster::StMgrHeartbeatRequest* _request) {
+void DummyTManager::HandleHeartbeatRequest(REQID _id, Connection* _conn,
+ proto::tmanager::StMgrHeartbeatRequest* _request) {
delete _request;
- proto::tmaster::StMgrHeartbeatResponse response;
+ proto::tmanager::StMgrHeartbeatResponse response;
response.mutable_status()->set_status(proto::system::OK);
SendResponse(_id, _conn, response);
}
diff --git a/heron/tmaster/tests/cpp/server/dummytmaster.h b/heron/tmanager/tests/cpp/server/dummytmanager.h
similarity index 73%
rename from heron/tmaster/tests/cpp/server/dummytmaster.h
rename to heron/tmanager/tests/cpp/server/dummytmanager.h
index c00ade5..c02c956 100644
--- a/heron/tmaster/tests/cpp/server/dummytmaster.h
+++ b/heron/tmanager/tests/cpp/server/dummytmanager.h
@@ -17,8 +17,8 @@
* under the License.
*/
-#ifndef __DUMMYTMASTER_H_
-#define __DUMMYTMASTER_H_
+#ifndef __DUMMYTMANAGER_H_
+#define __DUMMYTMANAGER_H_
#include <map>
#include <string>
@@ -29,17 +29,17 @@
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
namespace heron {
namespace testing {
-class DummyTMaster : public Server {
+class DummyTManager : public Server {
public:
- DummyTMaster(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options);
- ~DummyTMaster();
+ DummyTManager(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options);
+ ~DummyTManager();
- const tmaster::StMgrMap& stmgrs() const { return stmgrs_; }
+ const tmanager::StMgrMap& stmgrs() const { return stmgrs_; }
protected:
virtual void HandleNewConnection(Connection* _conn);
@@ -47,10 +47,10 @@
private:
void HandleRegisterRequest(REQID _id, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request);
+ pool_unique_ptr<proto::tmanager::StMgrRegisterRequest> _request);
void HandleHeartbeatRequest(REQID _id, Connection* _conn,
- proto::tmaster::StMgrHeartbeatRequest* _request);
- tmaster::StMgrMap stmgrs_;
+ proto::tmanager::StMgrHeartbeatRequest* _request);
+ tmanager::StMgrMap stmgrs_;
};
} // namespace testing
} // namespace heron
diff --git a/heron/tmaster/tests/cpp/server/stateful_checkpointer_unittest.cpp b/heron/tmanager/tests/cpp/server/stateful_checkpointer_unittest.cpp
similarity index 97%
rename from heron/tmaster/tests/cpp/server/stateful_checkpointer_unittest.cpp
rename to heron/tmanager/tests/cpp/server/stateful_checkpointer_unittest.cpp
index 67e46c3..0496a5e 100644
--- a/heron/tmaster/tests/cpp/server/stateful_checkpointer_unittest.cpp
+++ b/heron/tmanager/tests/cpp/server/stateful_checkpointer_unittest.cpp
@@ -193,7 +193,7 @@
TEST(StatefulCheckpointer, test_stored_logic) {
auto pplan = CreatePplan();
auto checkpointer =
- new heron::tmaster::StatefulCheckpointer(std::chrono::high_resolution_clock::now());
+ new heron::tmanager::StatefulCheckpointer(std::chrono::high_resolution_clock::now());
checkpointer->RegisterNewPhysicalPlan(*pplan);
int32_t allButOne = pplan->instances_size() - 1;
for (int32_t i = 0; i < allButOne; ++i) {
@@ -208,7 +208,7 @@
TEST(StMgr, test_skip_to_newer_ckptid) {
auto pplan = CreatePplan();
auto checkpointer =
- new heron::tmaster::StatefulCheckpointer(std::chrono::high_resolution_clock::now());
+ new heron::tmanager::StatefulCheckpointer(std::chrono::high_resolution_clock::now());
checkpointer->RegisterNewPhysicalPlan(*pplan);
int32_t allButOne = pplan->instances_size() - 1;
int32_t allButFive = pplan->instances_size() - 5;
diff --git a/heron/tmaster/tests/cpp/server/stateful_restorer_unittest.cpp b/heron/tmanager/tests/cpp/server/stateful_restorer_unittest.cpp
similarity index 88%
rename from heron/tmaster/tests/cpp/server/stateful_restorer_unittest.cpp
rename to heron/tmanager/tests/cpp/server/stateful_restorer_unittest.cpp
index d5ea03e..db3bda4 100644
--- a/heron/tmaster/tests/cpp/server/stateful_restorer_unittest.cpp
+++ b/heron/tmanager/tests/cpp/server/stateful_restorer_unittest.cpp
@@ -22,7 +22,7 @@
#include <thread>
#include <vector>
#include "server/dummystmgr.h"
-#include "server/dummytmaster.h"
+#include "server/dummytmanager.h"
#include "gtest/gtest.h"
#include "proto/messages.h"
#include "basics/basics.h"
@@ -164,32 +164,32 @@
ss->loop();
}
-void StartDummyTMaster(std::shared_ptr<EventLoopImpl>& ss, heron::testing::DummyTMaster*& mgr,
- std::thread*& tmaster_thread, sp_int32 tmaster_port) {
+void StartDummyTManager(std::shared_ptr<EventLoopImpl>& ss, heron::testing::DummyTManager*& mgr,
+ std::thread*& tmanager_thread, sp_int32 tmanager_port) {
// Create the select server for this stmgr to use
ss = std::make_shared<EventLoopImpl>();
NetworkOptions options;
options.set_host(LOCALHOST);
- options.set_port(tmaster_port);
+ options.set_port(tmanager_port);
options.set_max_packet_size(1024 * 1024);
options.set_socket_family(PF_INET);
- mgr = new heron::testing::DummyTMaster(ss, options);
+ mgr = new heron::testing::DummyTManager(ss, options);
mgr->Start();
- tmaster_thread = new std::thread(StartServer, ss);
+ tmanager_thread = new std::thread(StartServer, ss);
}
void StartDummyStMgr(std::shared_ptr<EventLoopImpl>& ss, heron::testing::DummyStMgr*& mgr,
- std::thread*& stmgr_thread, const sp_string tmaster_host,
- sp_int32 tmaster_port, const sp_string& stmgr_id, sp_int32 stmgr_port,
+ std::thread*& stmgr_thread, const sp_string tmanager_host,
+ sp_int32 tmanager_port, const sp_string& stmgr_id, sp_int32 stmgr_port,
const std::vector<heron::proto::system::Instance*>& instances) {
// Create the select server for this stmgr to use
ss = std::make_shared<EventLoopImpl>();
NetworkOptions options;
- options.set_host(tmaster_host);
- options.set_port(tmaster_port);
+ options.set_host(tmanager_host);
+ options.set_port(tmanager_port);
options.set_max_packet_size(1024 * 1024);
options.set_socket_family(PF_INET);
@@ -202,7 +202,7 @@
struct CommonResources {
// arguments
- sp_int32 tmaster_port_;
+ sp_int32 tmanager_port_;
sp_int32 stmgr_baseport_;
sp_string topology_name_;
sp_string topology_id_;
@@ -219,10 +219,10 @@
std::vector<sp_string> stmgrs_id_list_;
heron::proto::api::Topology* topology_;
- // Tmaster
- heron::testing::DummyTMaster* tmaster_;
- std::thread* tmaster_thread_;
- Piper* tmaster_piper_;
+ // Tmanager
+ heron::testing::DummyTManager* tmanager_;
+ std::thread* tmanager_thread_;
+ Piper* tmanager_piper_;
// Stmgr
std::vector<heron::testing::DummyStMgr*> stmgrs_list_;
@@ -239,13 +239,13 @@
std::map<sp_string, sp_int32> instanceid_stmgr_;
CommonResources() : topology_(nullptr),
- tmaster_(nullptr),
- tmaster_thread_(nullptr),
- tmaster_piper_(nullptr) {
+ tmanager_(nullptr),
+ tmanager_thread_(nullptr),
+ tmanager_piper_(nullptr) {
}
};
-void StartDummyTMaster(CommonResources& common) {
+void StartDummyTManager(CommonResources& common) {
// Generate a dummy topology
common.topology_ = GenerateDummyTopology(
common.topology_name_, common.topology_id_, common.num_spouts_, common.num_spout_instances_,
@@ -258,13 +258,13 @@
common.stmgrs_id_list_.push_back(id);
}
- // Start the tmaster
- std::shared_ptr<EventLoopImpl> tmaster_eventloop = nullptr;
+ // Start the tmanager
+ std::shared_ptr<EventLoopImpl> tmanager_eventloop = nullptr;
- StartDummyTMaster(tmaster_eventloop, common.tmaster_, common.tmaster_thread_,
- common.tmaster_port_);
- common.ss_list_.push_back(tmaster_eventloop);
- common.tmaster_piper_ = new Piper(tmaster_eventloop);
+ StartDummyTManager(tmanager_eventloop, common.tmanager_, common.tmanager_thread_,
+ common.tmanager_port_);
+ common.ss_list_.push_back(tmanager_eventloop);
+ common.tmanager_piper_ = new Piper(tmanager_eventloop);
}
void DistributeWorkersAcrossStmgrs(CommonResources& common) {
@@ -312,7 +312,7 @@
std::shared_ptr<EventLoopImpl> stmgr_ss = nullptr;
heron::testing::DummyStMgr* mgr = nullptr;
std::thread* stmgr_thread = nullptr;
- StartDummyStMgr(stmgr_ss, mgr, stmgr_thread, LOCALHOST, common.tmaster_port_,
+ StartDummyStMgr(stmgr_ss, mgr, stmgr_thread, LOCALHOST, common.tmanager_port_,
common.stmgrs_id_list_[i], common.stmgr_baseport_ + i,
common.stmgr_instance_list_[i]);
@@ -325,7 +325,7 @@
void SetUpCommonResources(CommonResources& common) {
// Initialize dummy params
- common.tmaster_port_ = 53001;
+ common.tmanager_port_ = 53001;
common.stmgr_baseport_ = 53002;
common.topology_name_ = "mytopology";
common.topology_id_ = "abcd-9999";
@@ -339,9 +339,9 @@
void TearCommonResources(CommonResources& common) {
delete common.topology_;
- delete common.tmaster_;
- delete common.tmaster_thread_;
- delete common.tmaster_piper_;
+ delete common.tmanager_;
+ delete common.tmanager_thread_;
+ delete common.tmanager_piper_;
// Cleanup the stream managers
for (size_t i = 0; i < common.stmgrs_list_.size(); ++i) {
@@ -362,8 +362,8 @@
CommonResources common;
SetUpCommonResources(common);
- // Start the tmaster etc.
- StartDummyTMaster(common);
+ // Start the tmanager etc.
+ StartDummyTManager(common);
// Distribute workers across stmgrs
DistributeWorkersAcrossStmgrs(common);
@@ -372,18 +372,18 @@
StartStMgrs(common);
// Wait until all stmgrs registered
- while (common.tmaster_->stmgrs().size() != common.num_stmgrs_) sleep(1);
+ while (common.tmanager_->stmgrs().size() != common.num_stmgrs_) sleep(1);
// Make sure that stmgrs have not gotten any restore message
for (auto stmgr : common.stmgrs_list_) {
EXPECT_FALSE(stmgr->GotRestoreMessage());
}
// Start Restorer
- auto restorer = new heron::tmaster::StatefulRestorer();
+ auto restorer = new heron::tmanager::StatefulRestorer();
EXPECT_FALSE(restorer->IsInProgress());
- common.tmaster_piper_->ExecuteInEventLoop(
- std::bind(&heron::tmaster::StatefulRestorer::StartRestore,
- restorer, "ckpt-1", common.tmaster_->stmgrs()));
+ common.tmanager_piper_->ExecuteInEventLoop(
+ std::bind(&heron::tmanager::StatefulRestorer::StartRestore,
+ restorer, "ckpt-1", common.tmanager_->stmgrs()));
sleep(1);
// all stmgrs should have received restore message
@@ -397,9 +397,9 @@
for (auto stmgr : common.stmgrs_list_) {
EXPECT_FALSE(restorer->GotResponse(stmgr->stmgrid()));
EXPECT_FALSE(stmgr->GotStartProcessingMessage());
- common.tmaster_piper_->ExecuteInEventLoop(
- std::bind(&heron::tmaster::StatefulRestorer::HandleStMgrRestored,
- restorer, stmgr->stmgrid(), "ckpt-1", txid, common.tmaster_->stmgrs()));
+ common.tmanager_piper_->ExecuteInEventLoop(
+ std::bind(&heron::tmanager::StatefulRestorer::HandleStMgrRestored,
+ restorer, stmgr->stmgrid(), "ckpt-1", txid, common.tmanager_->stmgrs()));
sleep(1);
EXPECT_TRUE(restorer->GotResponse(stmgr->stmgrid()));
}
@@ -420,7 +420,7 @@
}
// Wait for the threads to terminate
- common.tmaster_thread_->join();
+ common.tmanager_thread_->join();
for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) {
common.stmgrs_threads_list_[i]->join();
}
diff --git a/heron/tmaster/tests/cpp/server/tcontroller_unittest.cpp b/heron/tmanager/tests/cpp/server/tcontroller_unittest.cpp
similarity index 95%
rename from heron/tmaster/tests/cpp/server/tcontroller_unittest.cpp
rename to heron/tmanager/tests/cpp/server/tcontroller_unittest.cpp
index ffe6669..8e25b9a 100644
--- a/heron/tmaster/tests/cpp/server/tcontroller_unittest.cpp
+++ b/heron/tmanager/tests/cpp/server/tcontroller_unittest.cpp
@@ -35,7 +35,7 @@
parameters.push_back("bolt:test:3");
std::map<sp_string, std::map<sp_string, sp_string>> conf;
- heron::tmaster::TController::ParseRuntimeConfig(parameters, conf);
+ heron::tmanager::TController::ParseRuntimeConfig(parameters, conf);
EXPECT_EQ(conf.size(), 3);
EXPECT_EQ(conf["_topology_"]["test"], "1");
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmanager/tests/cpp/server/tmanager_unittest.cpp
similarity index 90%
rename from heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
rename to heron/tmanager/tests/cpp/server/tmanager_unittest.cpp
index fc5612d..f3eaabc 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmanager/tests/cpp/server/tmanager_unittest.cpp
@@ -38,7 +38,7 @@
#include "config/physical-plan-helper.h"
#include "statemgr/heron-statemgr.h"
#include "statemgr/heron-localfilestatemgr.h"
-#include "manager/tmaster.h"
+#include "manager/tmanager.h"
#include "manager/stmgr.h"
const sp_string SPOUT_NAME = "spout";
@@ -227,7 +227,7 @@
snprintf(dpath, sizeof(dpath), "%s", "/tmp/XXXXXX");
mkdtemp(dpath);
- // Write the dummy topology/tmaster location out to the local file system
+ // Write the dummy topology/tmanager location out to the local file system
// via thestate mgr
heron::common::HeronLocalFileStateMgr state_mgr(dpath, ss);
state_mgr.CreateTopology(*topology, NULL);
@@ -279,30 +279,30 @@
ss->loop();
}
-void StartTMaster(std::shared_ptr<EventLoopImpl>& ss, heron::tmaster::TMaster*& tmaster,
- std::thread*& tmaster_thread, const std::string& zkhostportlist,
+void StartTManager(std::shared_ptr<EventLoopImpl>& ss, heron::tmanager::TManager*& tmanager,
+ std::thread*& tmanager_thread, const std::string& zkhostportlist,
const std::string& topology_name, const std::string& topology_id,
- const std::string& dpath, const std::string& tmaster_host, sp_int32 tmaster_port,
- sp_int32 tmaster_controller_port, sp_int32 ckptmgr_port) {
+ const std::string& dpath, const std::string& tmanager_host, sp_int32 tmanager_port,
+ sp_int32 tmanager_controller_port, sp_int32 ckptmgr_port) {
ss = std::make_shared<EventLoopImpl>();
- tmaster = new heron::tmaster::TMaster(zkhostportlist, topology_name, topology_id, dpath,
- tmaster_controller_port, tmaster_port, tmaster_port + 2,
- tmaster_port + 3, ckptmgr_port,
+ tmanager = new heron::tmanager::TManager(zkhostportlist, topology_name, topology_id, dpath,
+ tmanager_controller_port, tmanager_port, tmanager_port + 2,
+ tmanager_port + 3, ckptmgr_port,
metrics_sinks_config_filename, LOCALHOST, ss);
- tmaster_thread = new std::thread(StartServer, ss);
- // tmaster_thread->start();
+ tmanager_thread = new std::thread(StartServer, ss);
+ // tmanager_thread->start();
}
void StartDummyStMgr(std::shared_ptr<EventLoopImpl>& ss, heron::testing::DummyStMgr*& mgr,
- std::thread*& stmgr_thread, const std::string tmaster_host,
- sp_int32 tmaster_port, const std::string& stmgr_id, sp_int32 stmgr_port,
+ std::thread*& stmgr_thread, const std::string tmanager_host,
+ sp_int32 tmanager_port, const std::string& stmgr_id, sp_int32 stmgr_port,
const std::vector<heron::proto::system::Instance*>& instances) {
// Create the select server for this stmgr to use
ss = std::make_shared<EventLoopImpl>();
NetworkOptions options;
- options.set_host(tmaster_host);
- options.set_port(tmaster_port);
+ options.set_host(tmanager_host);
+ options.set_port(tmanager_port);
options.set_max_packet_size(1024 * 1024);
options.set_socket_family(PF_INET);
@@ -315,9 +315,9 @@
struct CommonResources {
// arguments
- std::string tmaster_host_;
- sp_int32 tmaster_port_;
- sp_int32 tmaster_controller_port_;
+ std::string tmanager_host_;
+ sp_int32 tmanager_port_;
+ sp_int32 tmanager_controller_port_;
sp_int32 ckptmgr_port_;
sp_int32 stmgr_baseport_;
std::string zkhostportlist_;
@@ -338,8 +338,8 @@
heron::proto::api::Topology* topology_;
heron::proto::system::PackingPlan* packing_plan_;
- heron::tmaster::TMaster* tmaster_;
- std::thread* tmaster_thread_;
+ heron::tmanager::TManager* tmanager_;
+ std::thread* tmanager_thread_;
// Stmgr
std::vector<heron::testing::DummyStMgr*> stmgrs_list_;
@@ -356,7 +356,7 @@
std::map<std::string, sp_int32> instanceid_stmgr_;
- CommonResources() : topology_(NULL), tmaster_(NULL), tmaster_thread_(NULL) {
+ CommonResources() : topology_(NULL), tmanager_(NULL), tmanager_thread_(NULL) {
// Create the sington for heron_internals_config_reader
// if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
@@ -365,7 +365,7 @@
}
};
-void StartTMaster(CommonResources& common) {
+void StartTManager(CommonResources& common) {
// Generate a dummy topology
common.topology_ = GenerateDummyTopology(
common.topology_name_, common.topology_id_, common.num_spouts_, common.num_spout_instances_,
@@ -383,14 +383,14 @@
common.stmgrs_id_list_.push_back(id);
}
- // Start the tmaster
- std::shared_ptr<EventLoopImpl> tmaster_eventLoop;
+ // Start the tmanager
+ std::shared_ptr<EventLoopImpl> tmanager_eventLoop;
- StartTMaster(tmaster_eventLoop, common.tmaster_, common.tmaster_thread_, common.zkhostportlist_,
+ StartTManager(tmanager_eventLoop, common.tmanager_, common.tmanager_thread_, common.zkhostportlist_,
common.topology_name_, common.topology_id_, common.dpath_,
- common.tmaster_host_, common.tmaster_port_, common.tmaster_controller_port_,
+ common.tmanager_host_, common.tmanager_port_, common.tmanager_controller_port_,
common.ckptmgr_port_);
- common.ss_list_.push_back(tmaster_eventLoop);
+ common.ss_list_.push_back(tmanager_eventLoop);
}
void DistributeWorkersAcrossStmgrs(CommonResources& common) {
@@ -438,7 +438,7 @@
std::shared_ptr<EventLoopImpl> stmgr_ss;
heron::testing::DummyStMgr* mgr = NULL;
std::thread* stmgr_thread = NULL;
- StartDummyStMgr(stmgr_ss, mgr, stmgr_thread, LOCALHOST, common.tmaster_port_,
+ StartDummyStMgr(stmgr_ss, mgr, stmgr_thread, LOCALHOST, common.tmanager_port_,
common.stmgrs_id_list_[i], common.stmgr_baseport_ + i,
common.stmgr_instance_list_[i]);
@@ -450,9 +450,9 @@
void SetUpCommonResources(CommonResources& common) {
// Initialize dummy params
- common.tmaster_host_ = LOCALHOST;
- common.tmaster_port_ = 53001;
- common.tmaster_controller_port_ = 53002;
+ common.tmanager_host_ = LOCALHOST;
+ common.tmanager_port_ = 53001;
+ common.tmanager_controller_port_ = 53002;
common.ckptmgr_port_ = 53003;
common.stmgr_baseport_ = 53001;
common.topology_name_ = "mytopology";
@@ -471,8 +471,8 @@
void TearCommonResources(CommonResources& common) {
delete common.topology_;
delete common.packing_plan_;
- delete common.tmaster_thread_;
- delete common.tmaster_;
+ delete common.tmanager_thread_;
+ delete common.tmanager_;
// Cleanup the stream managers
for (size_t i = 0; i < common.stmgrs_list_.size(); ++i) {
@@ -562,7 +562,7 @@
}
-// Test to make sure that the tmaster forms the right pplan
+// Test to make sure that the tmanager forms the right pplan
// and sends it to all stmgrs
TEST(StMgr, test_pplan_distribute) {
CommonResources common;
@@ -570,8 +570,8 @@
sp_int8 num_workers_per_stmgr_ = (((common.num_spouts_ * common.num_spout_instances_) +
(common.num_bolts_ * common.num_bolt_instances_)) /
common.num_stmgrs_);
- // Start the tmaster etc.
- StartTMaster(common);
+ // Start the tmanager etc.
+ StartTManager(common);
// Distribute workers across stmgrs
DistributeWorkersAcrossStmgrs(common);
@@ -589,7 +589,7 @@
}
// Wait for the threads to terminate
- common.tmaster_thread_->join();
+ common.tmanager_thread_->join();
for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) {
common.stmgrs_threads_list_[i]->join();
}
@@ -609,7 +609,7 @@
}
// Test to see if activate/deactivate works
-// and that its distributed to tmasters
+// and that its distributed to tmanagers
TEST(StMgr, test_activate_deactivate) {
CommonResources common;
SetUpCommonResources(common);
@@ -617,8 +617,8 @@
sp_int8 num_workers_per_stmgr_ = (((common.num_spouts_ * common.num_spout_instances_) +
(common.num_bolts_ * common.num_bolt_instances_)) /
common.num_stmgrs_);
- // Start the tmaster etc.
- StartTMaster(common);
+ // Start the tmanager etc.
+ StartTManager(common);
// Distribute workers across stmgrs
DistributeWorkersAcrossStmgrs(common);
@@ -635,7 +635,7 @@
}
std::thread* deactivate_thread =
- new std::thread(ControlTopology, common.topology_id_, common.tmaster_controller_port_, false);
+ new std::thread(ControlTopology, common.topology_id_, common.tmanager_controller_port_, false);
// deactivate_thread->start();
deactivate_thread->join();
delete deactivate_thread;
@@ -648,7 +648,7 @@
}
std::thread* activate_thread =
- new std::thread(ControlTopology, common.topology_id_, common.tmaster_controller_port_, true);
+ new std::thread(ControlTopology, common.topology_id_, common.tmanager_controller_port_, true);
// activate_thread->start();
activate_thread->join();
delete activate_thread;
@@ -666,7 +666,7 @@
}
// Wait for the threads to terminate
- common.tmaster_thread_->join();
+ common.tmanager_thread_->join();
for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) {
common.stmgrs_threads_list_[i]->join();
}
@@ -693,8 +693,8 @@
CommonResources common;
SetUpCommonResources(common);
- // Start the tmaster etc.
- StartTMaster(common);
+ // Start the tmanager etc.
+ StartTManager(common);
// Distribute workers across stmgrs
DistributeWorkersAcrossStmgrs(common);
@@ -703,8 +703,8 @@
StartStMgrs(common);
// Wait till we get the physical plan populated on the stmgrs, then
- // verify current config values in tmaster as well as stream managers.
- // common.tmaster_->FetchPhysicalPlan();
+ // verify current config values in tmanager as well as stream managers.
+ // common.tmanager_->FetchPhysicalPlan();
// auto t = init_pplan->topology();
// auto c = t.topology_config();
for (size_t i = 0; i < common.stmgrs_list_.size(); ++i) {
@@ -712,25 +712,25 @@
}
// Test ValidateRuntimeConfig()
- heron::tmaster::ComponentConfigMap validate_good_config_map;
+ heron::tmanager::ComponentConfigMap validate_good_config_map;
std::map<std::string, std::string> validate_good_config;
validate_good_config[topology_init_config_1] = "1";
validate_good_config[topology_init_config_2] = "2";
const char* topology_key = heron::config::TopologyConfigHelper::GetReservedTopologyConfigKey();
validate_good_config_map[topology_key] = validate_good_config;
validate_good_config_map["spout1"] = validate_good_config;
- EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_good_config_map), true);
+ EXPECT_EQ(common.tmanager_->ValidateRuntimeConfig(validate_good_config_map), true);
- heron::tmaster::ComponentConfigMap validate_bad_config_map;
+ heron::tmanager::ComponentConfigMap validate_bad_config_map;
std::map<std::string, std::string> validate_bad_config;
validate_good_config[topology_init_config_1] = "1";
validate_bad_config_map["unknown_component"] = validate_good_config;
- EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_bad_config_map), false);
+ EXPECT_EQ(common.tmanager_->ValidateRuntimeConfig(validate_bad_config_map), false);
// Post runtime config request with no configs and expect 400 response.
std::vector<std::string> no_config;
std::thread* no_config_update_thread = new std::thread(UpdateRuntimeConfig,
- common.topology_id_, common.tmaster_controller_port_, no_config, 400, "no_config");
+ common.topology_id_, common.tmanager_controller_port_, no_config, 400, "no_config");
no_config_update_thread->join();
delete no_config_update_thread;
@@ -738,7 +738,7 @@
std::vector<std::string> wrong_config1;
wrong_config1.push_back("badformat"); // Bad format
std::thread* wrong_config1_update_thread = new std::thread(UpdateRuntimeConfig,
- common.topology_id_, common.tmaster_controller_port_, wrong_config1, 400, "wrong_config1");
+ common.topology_id_, common.tmanager_controller_port_, wrong_config1, 400, "wrong_config1");
wrong_config1_update_thread->join();
delete wrong_config1_update_thread;
@@ -747,7 +747,7 @@
// Component doesn't exist
wrong_config2.push_back("bad_component:topology.runtime.bolt.test_config:1");
std::thread* wrong_config2_update_thread = new std::thread(UpdateRuntimeConfig,
- common.topology_id_, common.tmaster_controller_port_, wrong_config2, 400, "wrong_config2");
+ common.topology_id_, common.tmanager_controller_port_, wrong_config2, 400, "wrong_config2");
wrong_config2_update_thread->join();
delete wrong_config2_update_thread;
@@ -758,7 +758,7 @@
good_config.push_back(runtime_test_spout + ":" + spout_init_config + ":3");
good_config.push_back(runtime_test_bolt + ":" + bolt_init_config + ":4");
std::thread* good_config_update_thread = new std::thread(UpdateRuntimeConfig,
- common.topology_id_, common.tmaster_controller_port_, good_config, 200, "good_config");
+ common.topology_id_, common.tmanager_controller_port_, good_config, 200, "good_config");
good_config_update_thread->join();
delete good_config_update_thread;
@@ -780,7 +780,7 @@
EXPECT_EQ(updated_bolt_config[bolt_init_config + ":runtime"], "4");
}
std::map<std::string, std::string> updated_config, updated_spout_config, updated_bolt_config;
- const auto pplan = common.tmaster_->getPhysicalPlan();
+ const auto pplan = common.tmanager_->getPhysicalPlan();
heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(pplan->topology(), updated_config);
EXPECT_EQ(updated_config[topology_init_config_1 + ":runtime"], "1");
EXPECT_EQ(updated_config[topology_init_config_2 + ":runtime"], "2");
@@ -797,7 +797,7 @@
}
// Wait for the threads to terminate
- common.tmaster_thread_->join();
+ common.tmanager_thread_->join();
for (size_t i = 0; i < common.stmgrs_threads_list_.size(); ++i) {
common.stmgrs_threads_list_[i]->join();
}
@@ -810,7 +810,7 @@
int main(int argc, char** argv) {
heron::common::Initialize(argv[0]);
- std::cout << "Current working directory (to find tmaster logs) "
+ std::cout << "Current working directory (to find tmanager logs) "
<< ProcessUtils::getCurrentWorkingDirectory() << std::endl;
testing::InitGoogleTest(&argc, argv);
sp_string configFile = heron_internals_config_filename;
diff --git a/heron/tmaster/src/cpp/manager/tmasterserver.cpp b/heron/tmaster/src/cpp/manager/tmasterserver.cpp
deleted file mode 100644
index 82096dd..0000000
--- a/heron/tmaster/src/cpp/manager/tmasterserver.cpp
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "manager/tmasterserver.h"
-#include <iostream>
-#include "manager/tmetrics-collector.h"
-#include "manager/tmaster.h"
-#include "processor/processor.h"
-#include "proto/messages.h"
-#include "basics/basics.h"
-#include "errors/errors.h"
-#include "threads/threads.h"
-#include "network/network.h"
-#include "metrics/tmaster-metrics.h"
-
-namespace heron {
-namespace tmaster {
-
-using std::unique_ptr;
-using std::shared_ptr;
-
-TMasterServer::TMasterServer(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
- shared_ptr<TMetricsCollector> _collector, TMaster* _tmaster)
- : Server(eventLoop, _options), collector_(_collector), tmaster_(_tmaster) {
- // Install the stmgr handlers
- InstallRequestHandler(&TMasterServer::HandleStMgrRegisterRequest);
- InstallRequestHandler(&TMasterServer::HandleStMgrHeartbeatRequest);
- InstallMessageHandler(&TMasterServer::HandleInstanceStateStored);
- InstallMessageHandler(&TMasterServer::HandleRestoreTopologyStateResponse);
- InstallMessageHandler(&TMasterServer::HandleResetTopologyStateMessage);
-
- // Install the metricsmgr handlers
- InstallMessageHandler(&TMasterServer::HandleMetricsMgrStats);
-}
-
-TMasterServer::~TMasterServer() {
- // Nothing really
-}
-
-void TMasterServer::HandleNewConnection(Connection* conn) {
- // There is nothing to be done here. Instead we wait for
- // the register message
-}
-
-void TMasterServer::HandleConnectionClose(Connection* _conn, NetworkErrorCode) {
- if (tmaster_->RemoveStMgrConnection(_conn) != proto::system::OK) {
- LOG(WARNING) << "Unknown connection closed on us from " << _conn->getIPAddress() << ":"
- << _conn->getPort() << ", possibly metrics mgr";
- return;
- }
-}
-
-void TMasterServer::HandleStMgrRegisterRequest(REQID _reqid, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrRegisterRequest> _request) {
- unique_ptr<StMgrRegisterProcessor> processor =
- make_unique<StMgrRegisterProcessor>(_reqid, _conn, std::move(_request), tmaster_, this);
- processor->Start();
-}
-
-void TMasterServer::HandleStMgrHeartbeatRequest(REQID _reqid, Connection* _conn,
- pool_unique_ptr<proto::tmaster::StMgrHeartbeatRequest> _request) {
- unique_ptr<StMgrHeartbeatProcessor> processor =
- make_unique<StMgrHeartbeatProcessor>(_reqid, _conn, std::move(_request), tmaster_, this);
- processor->Start();
-}
-
-void TMasterServer::HandleMetricsMgrStats(Connection*,
- pool_unique_ptr<proto::tmaster::PublishMetrics> _request) {
- collector_->AddMetric(*_request);
-}
-
-void TMasterServer::HandleInstanceStateStored(Connection*,
- pool_unique_ptr<proto::ckptmgr::InstanceStateStored> _message) {
- tmaster_->HandleInstanceStateStored(_message->checkpoint_id(), _message->instance());
-}
-
-void TMasterServer::HandleRestoreTopologyStateResponse(Connection* _conn,
- pool_unique_ptr<proto::ckptmgr::RestoreTopologyStateResponse> _message) {
- tmaster_->HandleRestoreTopologyStateResponse(_conn, _message->checkpoint_id(),
- _message->restore_txid(),
- _message->status().status());
-}
-
-void TMasterServer::HandleResetTopologyStateMessage(Connection* _conn,
- pool_unique_ptr<proto::ckptmgr::ResetTopologyState> _message) {
- tmaster_->ResetTopologyState(_conn, _message->dead_stmgr(),
- _message->dead_taskid(), _message->reason());
-}
-} // namespace tmaster
-} // namespace heron