blob: 36b78458d0ef73b743bc08a6cc7be259714ce7f0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "manager/tmaster.h"
#include <sys/resource.h>
#include <iostream>
#include <map>
#include <sstream>
#include <string>
#include <set>
#include <vector>
#include "manager/tmetrics-collector.h"
#include "manager/tcontroller.h"
#include "manager/stats-interface.h"
#include "manager/tmasterserver.h"
#include "manager/stmgrstate.h"
#include "manager/stateful-controller.h"
#include "manager/ckptmgr-client.h"
#include "processor/processor.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "zookeeper/zkclient.h"
#include "config/helper.h"
#include "config/heron-internals-config-reader.h"
#include "statemgr/heron-statemgr.h"
#include "metrics/tmaster-metrics.h"
namespace heron {
namespace tmaster {
using std::unique_ptr;
using std::make_shared;
// Stats for the process
const sp_string METRIC_CPU_USER = "__cpu_user_usec";
const sp_string METRIC_CPU_SYSTEM = "__cpu_system_usec";
const sp_string METRIC_UPTIME = "__uptime_sec";
const sp_string METRIC_MEM_USED = "__mem_used_bytes";
const sp_int64 STATE_MANAGER_RETRY_FREQUENCY = 10_s;
const sp_int64 PROCESS_METRICS_FREQUENCY = 60_s;
const sp_int64 UPTIME_METRIC_FREQUENCY = 1_s;
const sp_string METRIC_PREFIX = "__process";
TMaster::TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
const std::string& _topology_id, const std::string& _topdir,
sp_int32 _tmaster_controller_port,
sp_int32 _master_port, sp_int32 _stats_port, sp_int32 metricsMgrPort,
sp_int32 _ckptmgr_port,
const std::string& _metrics_sinks_yaml, const std::string& _myhost_name,
shared_ptr<EventLoop> eventLoop) {
start_time_ = std::chrono::high_resolution_clock::now();
zk_hostport_ = _zk_hostport;
topdir_ = _topdir;
tmaster_controller_ = nullptr;
tmaster_controller_port_ = _tmaster_controller_port;
master_ = nullptr;
master_port_ = _master_port;
stats_ = nullptr;
stats_port_ = _stats_port;
myhost_name_ = _myhost_name;
eventLoop_ = eventLoop;
dns_ = new AsyncDNS(eventLoop_);
http_client_ = new HTTPClient(eventLoop_, dns_);
metrics_collector_ = make_shared<TMetricsCollector>(config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterMetricsCollectorMaximumIntervalMin() * 60,
eventLoop_, _metrics_sinks_yaml);
mMetricsMgrPort = metricsMgrPort;
sp_int32 metricsExportIntervalSec =
config::HeronInternalsConfigReader::Instance()->GetHeronMetricsExportIntervalSec();
mMetricsMgrClient = make_shared<heron::common::MetricsMgrSt>(
mMetricsMgrPort, metricsExportIntervalSec, eventLoop_);
mMetricsMgrClient->Start(myhost_name_, master_port_, "__tmaster__",
"0"); // MM expects task_id, so just giving 0 for tmaster.
tmasterProcessMetrics = make_shared<heron::common::MultiAssignableMetric>();
mMetricsMgrClient->register_metric(METRIC_PREFIX, tmasterProcessMetrics);
ckptmgr_port_ = _ckptmgr_port;
ckptmgr_client_ = nullptr;
current_pplan_ = nullptr;
// The topology as first submitted by the user
// It shall only be used to construct the physical plan when TMaster first time starts
// Any runtime changes shall be made to current_pplan_->topology
topology_ = nullptr;
packing_plan_ = nullptr;
state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zk_hostport_, topdir_, eventLoop_);
assignment_in_progress_ = false;
do_reassign_ = false;
master_establish_attempts_ = 0;
tmaster_location_ = make_unique<proto::tmaster::TMasterLocation>();
tmaster_location_->set_topology_name(_topology_name);
tmaster_location_->set_topology_id(_topology_id);
tmaster_location_->set_host(myhost_name_);
tmaster_location_->set_controller_port(tmaster_controller_port_);
tmaster_location_->set_master_port(master_port_);
tmaster_location_->set_stats_port(stats_port_);
DCHECK(tmaster_location_->IsInitialized());
FetchPackingPlan();
// Send tmaster location to metrics mgr
mMetricsMgrClient->RefreshTMasterLocation(*tmaster_location_);
// Check for log pruning every 5 minutes
CHECK_GT(eventLoop_->registerTimer(
[](EventLoop::Status) { ::heron::common::PruneLogs(); }, true,
config::HeronInternalsConfigReader::Instance()->GetHeronLoggingPruneIntervalSec() *
1_s),
0);
// Flush logs every interval
CHECK_GT(eventLoop_->registerTimer(
[](EventLoop::Status) { ::heron::common::FlushLogs(); }, true,
config::HeronInternalsConfigReader::Instance()->GetHeronLoggingFlushIntervalSec() *
1_s),
0);
// Update uptime metric every 1 second
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateUptimeMetric();
}, true, UPTIME_METRIC_FREQUENCY), 0);
// Update Process related metrics every 60 seconds
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateProcessMetrics(status);
}, true, PROCESS_METRICS_FREQUENCY), 0);
stateful_controller_ = nullptr;
}
void TMaster::FetchPackingPlan() {
auto packing_plan = make_shared<proto::system::PackingPlan>();
state_mgr_->GetPackingPlan(tmaster_location_->topology_name(), packing_plan,
[packing_plan, this](proto::system::StatusCode status) {
this->OnPackingPlanFetch(packing_plan, status);
});
}
void TMaster::OnPackingPlanFetch(shared_ptr<proto::system::PackingPlan> newPackingPlan,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(INFO) << "PackingPlan Fetch failed with status " << _status;
LOG(INFO) << "Retrying after " << STATE_MANAGER_RETRY_FREQUENCY << " micro seconds ";
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status) {
this->FetchPackingPlan();
}, false, STATE_MANAGER_RETRY_FREQUENCY), 0);
} else {
// We got a new PackingPlan.
LOG(INFO) << "Fetched PackingPlan of " << newPackingPlan->container_plans().size()
<< " containers.";
if (!packing_plan_) {
LOG(INFO) << "Received initial packing plan. Initializing absent_stmgrs_";
packing_plan_ = newPackingPlan;
// We will keep the list of stmgrs with us.
// In case a assignment already exists, we will throw this
// list out and re-init with whats in assignment.
absent_stmgrs_.clear();
for (sp_int32 i = 0; i < packing_plan_->container_plans_size(); ++i) {
LOG(INFO) << "Adding container id " << packing_plan_->container_plans(i).id()
<< " to absent_stmgrs_";
// the packing plan represents container ids by the numerical id of the container. The
// physical plan prepends "stmgr-" to the integer and represents it as a string.
absent_stmgrs_.insert("stmgr-" + std::to_string(packing_plan_->container_plans(i).id()));
}
// this is part of the initialization process. Since we've got a packing plan we will
// register our self as the master
EstablishTMaster(EventLoop::TIMEOUT_EVENT);
} else {
// We must know for sure that we are TMaster before potentially deleting the physical plan
// in state manager. We know this to be the case here because we initially fetch
// packing_plan_ before becoming master, but we register the packing plan watcher only after
// becoming master. That guarantees that if packing_plan_ is already set and this method is
// invoked, it's due to the watch and we're master here.
if (packing_plan_ != newPackingPlan) {
LOG(INFO) << "Packing plan changed. Deleting physical plan and restarting TMaster to "
<< "reset internal state. Exiting.";
state_mgr_->DeletePhysicalPlan(tmaster_location_->topology_name(),
[this](proto::system::StatusCode status) {
::exit(1);
});
} else {
LOG(INFO) << "New Packing plan matches existing one.";
}
}
}
}
void TMaster::EstablishTMaster(EventLoop::Status) {
auto cb = [this](proto::system::StatusCode code) { this->SetTMasterLocationDone(code); };
state_mgr_->SetTMasterLocation(*tmaster_location_, std::move(cb));
// if zk lost the tmaster location, tmaster quits to bail out and re-establish its location
auto cb2 = [this]() {
LOG(ERROR) << " lost tmaster location in zk state manager. Bailing out..." << std::endl;
::exit(1);
};
state_mgr_->SetTMasterLocationWatch(tmaster_location_->topology_name(), std::move(cb2));
master_establish_attempts_++;
}
TMaster::~TMaster() {
if (master_) {
master_->Stop();
}
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
stmgrs_.erase(iter->first);
}
stmgrs_.clear();
mMetricsMgrClient->unregister_metric(METRIC_PREFIX);
delete http_client_;
delete dns_;
}
void TMaster::UpdateUptimeMetric() {
auto end_time = std::chrono::high_resolution_clock::now();
auto uptime = std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time_).count();
tmasterProcessMetrics->scope(METRIC_UPTIME)->SetValue(uptime);
}
void TMaster::UpdateProcessMetrics(EventLoop::Status) {
// CPU
struct rusage usage;
ProcessUtils::getResourceUsage(&usage);
tmasterProcessMetrics->scope(METRIC_CPU_USER)
->SetValue((usage.ru_utime.tv_sec * 1_s) + usage.ru_utime.tv_usec);
tmasterProcessMetrics->scope(METRIC_CPU_SYSTEM)
->SetValue((usage.ru_stime.tv_sec * 1_s) + usage.ru_stime.tv_usec);
// Memory
size_t totalmemory = ProcessUtils::getTotalMemoryUsed();
tmasterProcessMetrics->scope(METRIC_MEM_USED)->SetValue(totalmemory);
}
void TMaster::SetTMasterLocationDone(proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
if (_code == proto::system::TMASTERLOCATION_ALREADY_EXISTS &&
master_establish_attempts_ <
config::HeronInternalsConfigReader::Instance()->GetHeronTmasterEstablishRetryTimes()) {
LOG(INFO) << "Topology Master node already exists. Maybe its "
<< "because of our restart. Will try again" << std::endl;
// Attempt again
auto cb = [this](EventLoop::Status status) { this->EstablishTMaster(status); };
eventLoop_->registerTimer(std::move(cb), false,
config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterEstablishRetryIntervalSec() *
1_s);
return;
}
// There was an error setting our location
LOG(ERROR) << "For topology " << tmaster_location_->topology_name()
<< " Error setting ourselves as TMaster. Error code is " << _code << std::endl;
::exit(1);
}
master_establish_attempts_ = 0;
// We are now the master
LOG(INFO) << "Successfully set ourselves as master\n";
// Lets now read the topology
topology_ = make_unique<proto::api::Topology>();
state_mgr_->GetTopology(tmaster_location_->topology_name(), *topology_,
[this](proto::system::StatusCode code) {
this->GetTopologyDone(code);
});
// and register packing plan watcher to pick up changes
state_mgr_->SetPackingPlanWatch(tmaster_location_->topology_name(), [this]() {
this->FetchPackingPlan();
});
}
void TMaster::GetTopologyDone(proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
// Without Topology we can't do much
LOG(ERROR) << "For topology " << tmaster_location_->topology_name()
<< " Error getting topology. Error code is " << _code << std::endl;
::exit(1);
}
// Ok things are fine. topology_ contains the result
// Just make sure it makes sense.
CHECK_NOTNULL(topology_);
if (!ValidateTopology(*topology_)) {
LOG(ERROR) << "Topology invalid" << std::endl;
::exit(1);
}
LOG(INFO) << "Topology read and validated\n";
if (heron::config::TopologyConfigHelper::GetReliabilityMode(*topology_)
== config::TopologyConfigVars::EFFECTIVELY_ONCE) {
// Establish connection to ckptmgr
NetworkOptions ckpt_options;
ckpt_options.set_host("127.0.0.1");
ckpt_options.set_port(ckptmgr_port_);
ckpt_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
1024 * 1024);
ckptmgr_client_ = make_unique<CkptMgrClient>(eventLoop_, ckpt_options,
topology_->name(), topology_->id(),
std::bind(&TMaster::HandleCleanStatefulCheckpointResponse,
this, std::placeholders::_1));
// Start the client
ckptmgr_client_->Start();
// We also need to load latest checkpoint state
auto ckpt = std::make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
auto cb = [ckpt, this](proto::system::StatusCode code) {
this->GetStatefulCheckpointsDone(ckpt, code);
};
state_mgr_->GetStatefulCheckpoints(tmaster_location_->topology_name(), ckpt, std::move(cb));
} else {
// Now see if there is already a pplan
FetchPhysicalPlan();
}
}
void TMaster::GetStatefulCheckpointsDone(
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt,
proto::system::StatusCode _code) {
if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
<< " Getting Stateful Checkpoint failed with error " << _code;
}
if (_code == proto::system::PATH_DOES_NOT_EXIST) {
LOG(INFO) << "For topology " << tmaster_location_->topology_name()
<< " No existing globally consistent checkpoint found "
<< " inserting a empty one";
// We need to set an empty one
auto ckpts = make_shared<proto::ckptmgr::StatefulConsistentCheckpoints>();
auto ckpt = ckpts->add_consistent_checkpoints();
ckpt->set_checkpoint_id("");
ckpt->set_packing_plan_id(packing_plan_->id());
auto cb = [this, ckpts](proto::system::StatusCode code) {
this->SetStatefulCheckpointsDone(code, ckpts);
};
state_mgr_->CreateStatefulCheckpoints(tmaster_location_->topology_name(),
ckpts, std::move(cb));
} else {
LOG(INFO) << "For topology " << tmaster_location_->topology_name()
<< " An existing globally consistent checkpoint found "
<< _ckpt->DebugString();
SetupStatefulController(std::move(_ckpt));
FetchPhysicalPlan();
}
}
void TMaster::SetStatefulCheckpointsDone(proto::system::StatusCode _code,
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
if (_code != proto::system::OK) {
LOG(FATAL) << "For topology " << tmaster_location_->topology_name()
<< " Setting empty Stateful Checkpoint failed with error " << _code;
}
SetupStatefulController(_ckpt);
FetchPhysicalPlan();
}
void TMaster::SetupStatefulController(
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _ckpt) {
sp_int64 stateful_checkpoint_interval =
config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300);
CHECK_GT(stateful_checkpoint_interval, 0);
auto cb = [this](const proto::ckptmgr::StatefulConsistentCheckpoints& new_ckpts) {
this->HandleStatefulCheckpointSave(new_ckpts);
};
// Instantiate the stateful restorer/coordinator
stateful_controller_ = make_unique<StatefulController>(topology_->name(), _ckpt, state_mgr_,
start_time_, mMetricsMgrClient, cb);
LOG(INFO) << "Starting timer to checkpoint state every "
<< stateful_checkpoint_interval << " seconds";
CHECK_GT(eventLoop_->registerTimer(
[this](EventLoop::Status) { this->SendCheckpointMarker(); }, true,
stateful_checkpoint_interval * 1000 * 1000),
0);
}
void TMaster::ResetTopologyState(Connection* _conn, const std::string& _dead_stmgr,
int32_t _dead_instance, const std::string& _reason) {
LOG(INFO) << "Got a reset topology request with dead_stmgr " << _dead_stmgr
<< " dead_instance " << _dead_instance << " and reason " << _reason;
if (connection_to_stmgr_id_.find(_conn) == connection_to_stmgr_id_.end()) {
LOG(ERROR) << "The ResetTopology came from an unknown connection";
return;
}
const std::string& stmgr = connection_to_stmgr_id_[_conn];
LOG(INFO) << "The ResetTopology message came from stmgr " << stmgr;
if (stateful_controller_ && absent_stmgrs_.empty()) {
if (!stateful_controller_->RestoreInProgress()) {
LOG(INFO) << "We are not in restore, hence we are starting Restore";
stateful_controller_->StartRestore(stmgrs_, false);
} else if (stateful_controller_->GotRestoreResponse(stmgr)) {
// We are in Restore but we have already gotten response from this
// stmgr. Maybe some other connections dropped. So start afresh
LOG(INFO) << "We are in restore and have already received response from this stmgr";
LOG(INFO) << "Restarting the restore";
stateful_controller_->StartRestore(stmgrs_, false);
} else {
// So we can safely ignore this because the stmgr will later
// send us a RestoreResponse when things get better
LOG(INFO) << "We are in restore and have not yet received response from this stmgr";
}
}
}
void TMaster::FetchPhysicalPlan() {
auto pplan = make_shared<proto::system::PhysicalPlan>();
auto cb = [pplan, this](proto::system::StatusCode code) {
this->GetPhysicalPlanDone(pplan, code);
};
state_mgr_->GetPhysicalPlan(tmaster_location_->topology_name(), pplan, std::move(cb));
}
void TMaster::SendCheckpointMarker() {
if (!absent_stmgrs_.empty()) {
LOG(INFO) << "Not sending checkpoint marker because not all stmgrs have connected to us";
return;
}
stateful_controller_->StartCheckpoint(stmgrs_);
}
void TMaster::HandleInstanceStateStored(const std::string& _checkpoint_id,
const proto::system::Instance& _instance) {
LOG(INFO) << "Got notification from stmgr that we saved checkpoint for task "
<< _instance.info().task_id() << " for checkpoint " << _checkpoint_id;
if (stateful_controller_) {
stateful_controller_->HandleInstanceStateStored(_checkpoint_id, packing_plan_->id(), _instance);
}
}
void TMaster::HandleRestoreTopologyStateResponse(Connection* _conn,
const std::string& _checkpoint_id,
int64_t _restore_txid,
proto::system::StatusCode _status) {
if (connection_to_stmgr_id_.find(_conn) == connection_to_stmgr_id_.end()) {
LOG(ERROR) << "Got HandleRestoreState Response from unknown connection "
<< _conn->getIPAddress() << " : " << _conn->getPort();
return;
}
if (stateful_controller_) {
stateful_controller_->HandleStMgrRestored(connection_to_stmgr_id_[_conn], _checkpoint_id,
_restore_txid, _status, stmgrs_);
}
}
void TMaster::GetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code) {
// Physical plan need not exist. First check if some other error occurred.
if (_code != proto::system::OK && _code != proto::system::PATH_DOES_NOT_EXIST) {
// Something bad happened. Bail out!
// TODO(kramasamy): This is not as bad as it seems. Maybe we can delete this assignment
// and have a new assignment instead.
LOG(ERROR) << "For topology " << tmaster_location_->topology_name()
<< " Error getting assignment. Error code is " << _code << std::endl;
::exit(1);
}
if (_code == proto::system::PATH_DOES_NOT_EXIST) {
LOG(ERROR) << "There was no existing physical plan\n";
// We never did assignment in the first place
} else {
LOG(INFO) << "There was an existing physical plan\n";
CHECK_EQ(_code, proto::system::OK);
current_pplan_ = _pplan;
if (stateful_controller_) {
stateful_controller_->RegisterNewPhysicalPlan(*current_pplan_);
}
}
// Now that we have our state all setup, its time to start accepting requests
// Port for the stmgrs to connect to
NetworkOptions master_options;
master_options.set_host(myhost_name_);
master_options.set_port(master_port_);
master_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
1_MB);
master_options.set_socket_family(PF_INET);
master_ = make_unique<TMasterServer>(eventLoop_, master_options, metrics_collector_, this);
sp_int32 retval = master_->Start();
if (retval != SP_OK) {
LOG(FATAL) << "Failed to start TMaster Master Server with rcode: " << retval;
}
// Port for the scheduler to connect to
NetworkOptions controller_options;
controller_options.set_host(myhost_name_);
controller_options.set_port(tmaster_controller_port_);
controller_options.set_max_packet_size(
config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterNetworkControllerOptionsMaximumPacketMb() *
1_MB);
controller_options.set_socket_family(PF_INET);
tmaster_controller_ = make_unique<TController>(eventLoop_, controller_options, this);
retval = tmaster_controller_->Start();
if (retval != SP_OK) {
LOG(FATAL) << "Failed to start TMaster Controller Server with rcode: " << retval;
}
// Http port for stat queries
NetworkOptions stats_options;
if (config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterMetricsNetworkBindAllInterfaces()) {
stats_options.set_host("0.0.0.0");
} else {
stats_options.set_host(myhost_name_);
}
stats_options.set_port(stats_port_);
stats_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterNetworkStatsOptionsMaximumPacketMb() *
1_MB);
stats_options.set_socket_family(PF_INET);
stats_ = make_unique<StatsInterface>(eventLoop_, stats_options, metrics_collector_, this);
}
void TMaster::ActivateTopology(VCallback<proto::system::StatusCode> cb) {
CHECK_EQ(current_pplan_->topology().state(), proto::api::PAUSED);
DCHECK(current_pplan_->topology().IsInitialized());
// Set the status
auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->CopyFrom(*current_pplan_);
new_pplan->mutable_topology()->set_state(proto::api::RUNNING);
auto callback = [new_pplan, this, cb](proto::system::StatusCode code) {
cb(code);
this->SetPhysicalPlanDone(new_pplan, code);
};
state_mgr_->SetPhysicalPlan(*new_pplan, std::move(callback));
}
void TMaster::DeActivateTopology(VCallback<proto::system::StatusCode> cb) {
CHECK_EQ(current_pplan_->topology().state(), proto::api::RUNNING);
DCHECK(current_pplan_->topology().IsInitialized());
// Set the status
auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->CopyFrom(*current_pplan_);
new_pplan->mutable_topology()->set_state(proto::api::PAUSED);
auto callback = [new_pplan, this, cb](proto::system::StatusCode code) {
cb(code);
this->SetPhysicalPlanDone(new_pplan, code);
};
state_mgr_->SetPhysicalPlan(*new_pplan, std::move(callback));
}
bool TMaster::UpdateRuntimeConfig(const ComponentConfigMap& _config,
VCallback<proto::system::StatusCode> cb) {
DCHECK(current_pplan_->topology().IsInitialized());
LOG(INFO) << "Update runtime config: ";
LogConfig(_config);
// Parse and set the new configs
auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->CopyFrom(*current_pplan_);
proto::api::Topology* topology = new_pplan->mutable_topology();
if (!UpdateRuntimeConfigInTopology(topology, _config)) {
LOG(ERROR) << "Fail to update runtime config in topology";
return false;
}
auto callback = [new_pplan, this, cb](proto::system::StatusCode code) {
cb(code);
this->SetPhysicalPlanDone(new_pplan, code);
};
state_mgr_->SetPhysicalPlan(*new_pplan, std::move(callback));
return true;
}
void TMaster::CleanAllStatefulCheckpoint() {
ckptmgr_client_->SendCleanStatefulCheckpointRequest("", true);
}
void TMaster::HandleStatefulCheckpointSave(
const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts) {
// broadcast globally consistent checkpoint completion
proto::ckptmgr::StatefulConsistentCheckpointSaved msg;
msg.mutable_consistent_checkpoint()->CopyFrom(new_ckpts.consistent_checkpoints(0));
for (auto & stmgr : stmgrs_) {
stmgr.second->SendCheckpointSavedMessage(msg);
}
// clean oldest checkpoint on save
std::string oldest_ckpt_id =
new_ckpts.consistent_checkpoints(new_ckpts.consistent_checkpoints_size() - 1)
.checkpoint_id();
ckptmgr_client_->SendCleanStatefulCheckpointRequest(oldest_ckpt_id, false);
}
// Called when ckptmgr completes the clean stateful checkpoint request
void TMaster::HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status) {
tmaster_controller_->HandleCleanStatefulCheckpointResponse(_status);
}
// Update configurations in physical plan.
// Return false if a config doesn't exist, but this shouldn't happen if the config has been
// validated using ValidateRuntimeConig() function.
bool TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
const ComponentConfigMap& _config) {
DCHECK(_topology->IsInitialized());
ComponentConfigMap::const_iterator iter;
const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
// Get config for topology or component.
std::map<std::string, std::string> config;
config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second, config);
if (iter->first == topology_key) {
config::TopologyConfigHelper::SetTopologyRuntimeConfig(_topology, config);
} else {
config::TopologyConfigHelper::SetComponentRuntimeConfig(_topology, iter->first, config);
}
}
return true;
}
bool TMaster::ValidateRuntimeConfig(const ComponentConfigMap& _config) const {
return ValidateRuntimeConfigNames(_config);
}
void TMaster::KillContainer(const std::string& host_name,
sp_int32 shell_port, const std::string& stmgr_id) {
LOG(INFO) << "Start killing " << stmgr_id << " on " <<
host_name << ":" << shell_port;
HTTPKeyValuePairs kvs;
kvs.push_back(make_pair("secret", GetTopologyId()));
OutgoingHTTPRequest* request =
new OutgoingHTTPRequest(host_name, shell_port,
"/killexecutor", BaseHTTPRequest::POST, kvs);
auto cb = [host_name, shell_port, stmgr_id](IncomingHTTPResponse* response) {
LOG(INFO) << "Response code of HTTP request of killing " << stmgr_id
<< " on " << host_name << ":" << shell_port << ": "
<< response->response_code();
};
if (http_client_->SendRequest(request, std::move(cb)) != SP_OK) {
LOG(ERROR) << "Failed to kill " << stmgr_id << " on "
<< host_name << ":" << shell_port;
}
LOG(INFO) << "Finish killing " << stmgr_id << " on " <<
host_name << ":" << shell_port;
return;
}
proto::system::Status* TMaster::RegisterStMgr(
const proto::system::StMgr& _stmgr,
const std::vector<shared_ptr<proto::system::Instance>>& _instances,
Connection* _conn, shared_ptr<proto::system::PhysicalPlan>& _pplan) {
const std::string& stmgr_id = _stmgr.id();
LOG(INFO) << "Got a register stream manager request from " << stmgr_id;
// First check if there are any other stream manager present with the same id
if (stmgrs_.find(stmgr_id) != stmgrs_.end()) {
// Some other dude is already present with us.
// First check to see if that other guy has timed out
if (!stmgrs_[stmgr_id]->TimedOut()) {
LOG(ERROR) << "Another stream manager " << stmgr_id << " exists at "
<< stmgrs_[stmgr_id]->get_connection()->getIPAddress() << ":"
<< stmgrs_[stmgr_id]->get_connection()->getPort()
<< " with the same id and it hasn't timed out";
LOG(INFO) << "Potential zombie host exists. Start killing both containers";
std::string zombie_host_name = stmgrs_[stmgr_id]->get_stmgr()->host_name();
sp_int32 zombie_port = stmgrs_[stmgr_id]->get_stmgr()->shell_port();
std::string new_host_name = _stmgr.host_name();
sp_int32 new_port = _stmgr.shell_port();
KillContainer(zombie_host_name, zombie_port, stmgr_id);
KillContainer(new_host_name, new_port, stmgr_id);
proto::system::Status* status = new proto::system::Status();
status->set_status(proto::system::DUPLICATE_STRMGR);
status->set_message("Duplicate StreamManager");
return status;
} else {
// The other guy has timed out
// TODO(kramasamy): Currently, whenever a disconnect happens, we remove it
// for the stmgrs_ list. Which means this case will only happen
// if the stmgr maintains connection but hasn't sent a heartbeat
// in a while.
LOG(ERROR) << "Another stmgr exists at "
<< stmgrs_[stmgr_id]->get_connection()->getIPAddress() << ":"
<< stmgrs_[stmgr_id]->get_connection()->getPort()
<< " with the same id but it has timed out";
stmgrs_[stmgr_id]->UpdateWithNewStMgr(_stmgr, _instances, _conn);
connection_to_stmgr_id_[_conn] = stmgr_id;
}
} else if (absent_stmgrs_.find(stmgr_id) == absent_stmgrs_.end()) {
// Check to see if we were expecting this guy and it is not expected
LOG(ERROR) << "We were not expecting stream manager " << stmgr_id;
proto::system::Status* status = new proto::system::Status();
status->set_status(proto::system::INVALID_STMGR);
status->set_message("Invalid StreamManager");
return status;
} else {
// This guy was indeed expected
stmgrs_[stmgr_id] = make_shared<StMgrState>(_conn, _stmgr, _instances, *master_);
connection_to_stmgr_id_[_conn] = stmgr_id;
absent_stmgrs_.erase(stmgr_id);
}
if (absent_stmgrs_.empty()) {
// All stmgrs are ready
if (assignment_in_progress_) {
do_reassign_ = true;
} else {
assignment_in_progress_ = true;
LOG(INFO) << "All stream managers have connected with us";
auto cb = [this](EventLoop::Status status) { this->DoPhysicalPlan(status); };
CHECK_GE(eventLoop_->registerTimer(std::move(cb), false, 0), 0);
}
}
_pplan = current_pplan_;
proto::system::Status* status = new proto::system::Status();
status->set_status(proto::system::OK);
status->set_message("Welcome StreamManager");
return status;
}
void TMaster::DoPhysicalPlan(EventLoop::Status) {
do_reassign_ = false;
if (!absent_stmgrs_.empty()) {
LOG(INFO) << "Called DoPhysicalPlan when absent_stmgrs_size is " << absent_stmgrs_.size()
<< std::endl;
// Dont do anything.
assignment_in_progress_ = false;
return;
}
// TODO(kramasamy): If current_assignment exists, we need
// to use as many portions from it as possible
shared_ptr<proto::system::PhysicalPlan> pplan = MakePhysicalPlan();
CHECK_NOTNULL(pplan);
DCHECK(pplan->IsInitialized());
if (!ValidateStMgrsWithPackingPlan()) {
// TODO(kramasamy): Do Something better here
LOG(ERROR) << "Packing plan and StMgr mismatch... Dying\n";
::exit(1);
}
auto cb = [pplan, this](proto::system::StatusCode code) {
this->SetPhysicalPlanDone(pplan, code);
};
if (current_pplan_) {
state_mgr_->SetPhysicalPlan(*pplan, std::move(cb));
} else {
// This is the first time a physical plan was made
// TODO(kramasamy): This is probably not the right solution
// because what if do_reassign_ is set to true before
// SetPhysicalPlan returns. Then we will be doing another
// Create which will fail. Fix it later.
state_mgr_->CreatePhysicalPlan(*pplan, std::move(cb));
}
}
void TMaster::SetPhysicalPlanDone(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::system::StatusCode _code) {
if (_code != proto::system::OK) {
LOG(ERROR) << "Error writing assignment to statemgr. Error code is " << _code << std::endl;
::exit(1);
}
LOG(INFO) << "Successfully wrote new assignment to state" << std::endl;
if (do_reassign_) {
// Some other mapping change happened
assignment_in_progress_ = true;
LOG(INFO) << "Doing assignment since physical assignment might have changed" << std::endl;
auto cb = [this](EventLoop::Status status) { this->DoPhysicalPlan(status); };
CHECK_GE(eventLoop_->registerTimer(std::move(cb), false, 0), 0);
} else {
bool first_time_pplan = current_pplan_ == nullptr;
current_pplan_ = _pplan;
assignment_in_progress_ = false;
// We need to pass that on to all streammanagers
DistributePhysicalPlan();
if (stateful_controller_) {
stateful_controller_->RegisterNewPhysicalPlan(*current_pplan_);
LOG(INFO) << "Starting Stateful Restore now that all stmgrs have connected";
stateful_controller_->StartRestore(stmgrs_,
first_time_pplan &&
config::TopologyConfigHelper::StatefulTopologyStartClean(current_pplan_->topology()));
}
}
}
bool TMaster::DistributePhysicalPlan() {
if (current_pplan_) {
// First valid the physical plan to distribute
LOG(INFO) << "To distribute new physical plan:" << std::endl;
config::PhysicalPlanHelper::LogPhysicalPlan(*current_pplan_);
config::TopologyConfigHelper::LogTopology(current_pplan_->topology());
// Distribute physical plan to all active stmgrs
StMgrMapIter iter;
for (iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
iter->second->NewPhysicalPlan(*current_pplan_);
}
return true;
}
LOG(ERROR) << "No valid physical plan yet" << std::endl;
return false;
}
std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse> TMaster::GetStmgrsRegSummary() {
auto response = std::unique_ptr<proto::tmaster::StmgrsRegistrationSummaryResponse>(
new proto::tmaster::StmgrsRegistrationSummaryResponse());
for (auto it = stmgrs_.begin(); it != stmgrs_.end(); ++it) {
response->add_registered_stmgrs(it->first);
}
for (auto it = absent_stmgrs_.begin(); it != absent_stmgrs_.end(); ++it) {
response->add_absent_stmgrs(*it);
}
return std::move(response);
}
shared_ptr<proto::system::PhysicalPlan> TMaster::MakePhysicalPlan() {
// TODO(kramasamy): At some point, we need to talk to our scheduler
// and do this scheduling
if (current_pplan_) {
// There is already an existing assignment. However stmgrs might have
// died and come up on different machines. This means that
// we need to just adjust the stmgrs mapping
// First lets verify that our original pplan and instances
// all match up
CHECK(ValidateStMgrsWithPhysicalPlan(current_pplan_));
auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->mutable_topology()->CopyFrom(current_pplan_->topology());
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
new_pplan->add_stmgrs()->CopyFrom(*(iter->second->get_stmgr()));
}
for (sp_int32 i = 0; i < current_pplan_->instances_size(); ++i) {
new_pplan->add_instances()->CopyFrom(current_pplan_->instances(i));
}
return new_pplan;
}
// TMaster does not really have any control over who does what.
// That has already been decided while launching the jobs.
// TMaster just stitches the info together to pass to everyone
// Build the PhysicalPlan structure
auto new_pplan = make_shared<proto::system::PhysicalPlan>();
new_pplan->mutable_topology()->CopyFrom(*topology_);
// Build the physical assignments
for (StMgrMapIter stmgr_iter = stmgrs_.begin(); stmgr_iter != stmgrs_.end(); ++stmgr_iter) {
new_pplan->add_stmgrs()->CopyFrom(*(stmgr_iter->second->get_stmgr()));
const std::vector<shared_ptr<proto::system::Instance>>& instances =
stmgr_iter->second->get_instances();
for (size_t i = 0; i < instances.size(); ++i) {
new_pplan->add_instances()->CopyFrom(*(instances[i]));
}
}
return new_pplan;
}
proto::system::Status* TMaster::UpdateStMgrHeartbeat(Connection* _conn, sp_int64 _time,
proto::system::StMgrStats* _stats) {
proto::system::Status* retval = new proto::system::Status();
if (connection_to_stmgr_id_.find(_conn) == connection_to_stmgr_id_.end()) {
retval->set_status(proto::system::INVALID_STMGR);
retval->set_message("Unknown connection doing stmgr heartbeat");
return retval;
}
const std::string& stmgr = connection_to_stmgr_id_[_conn];
// TODO(kramasamy): Maybe do more checks?
if (stmgrs_.find(stmgr) == stmgrs_.end()) {
retval->set_status(proto::system::INVALID_STMGR);
retval->set_message("Unknown stream manager id");
return retval;
}
if (stmgrs_[stmgr]->get_connection() != _conn) {
retval->set_status(proto::system::INVALID_STMGR);
retval->set_message("Unknown stream manager connection");
return retval;
}
stmgrs_[stmgr]->heartbeat(_time, _stats);
retval->set_status(proto::system::OK);
return retval;
}
proto::system::StatusCode TMaster::RemoveStMgrConnection(Connection* _conn) {
if (connection_to_stmgr_id_.find(_conn) == connection_to_stmgr_id_.end()) {
return proto::system::INVALID_STMGR;
}
const std::string& stmgr_id = connection_to_stmgr_id_[_conn];
if (stmgrs_.find(stmgr_id) == stmgrs_.end()) {
return proto::system::INVALID_STMGR;
}
auto stmgr = stmgrs_[stmgr_id];
// This guy disconnected from us
LOG(INFO) << "StMgr " << stmgr->get_stmgr()->id() << " disconnected from us" << std::endl;
stmgrs_.erase(stmgr->get_id());
connection_to_stmgr_id_.erase(_conn);
absent_stmgrs_.insert(stmgr->get_id());
return proto::system::OK;
}
////////////////////////////////////////////////////////////////////////////////
// Below are valid checking functions
////////////////////////////////////////////////////////////////////////////////
bool TMaster::ValidateTopology(const proto::api::Topology& _topology) {
if (tmaster_location_->topology_name() != _topology.name()) {
LOG(ERROR) << "topology name mismatch! Expected topology name is "
<< tmaster_location_->topology_name() << " but found in zk " << _topology.name()
<< std::endl;
return false;
}
if (tmaster_location_->topology_id() != _topology.id()) {
LOG(ERROR) << "topology id mismatch! Expected topology id is "
<< tmaster_location_->topology_id() << " but found in zk " << _topology.id()
<< std::endl;
return false;
}
std::set<std::string> component_names;
for (sp_int32 i = 0; i < _topology.spouts_size(); ++i) {
if (component_names.find(_topology.spouts(i).comp().name()) != component_names.end()) {
LOG(ERROR) << "Component names are not unique " << _topology.spouts(i).comp().name() << "\n";
return false;
}
component_names.insert(_topology.spouts(i).comp().name());
}
for (sp_int32 i = 0; i < _topology.bolts_size(); ++i) {
if (component_names.find(_topology.bolts(i).comp().name()) != component_names.end()) {
LOG(ERROR) << "Component names are not unique " << _topology.bolts(i).comp().name() << "\n";
return false;
}
component_names.insert(_topology.bolts(i).comp().name());
}
return true;
}
bool TMaster::ValidateStMgrsWithPackingPlan() {
// here we check to see if the total number of instances
// across all stmgrs match up to all the spout/bolt
// parallelism the packing plan has specified
sp_int32 ntasks = 0;
for (sp_int32 i = 0; i < packing_plan_->container_plans_size(); ++i) {
ntasks += packing_plan_->container_plans(i).instance_plans_size();
}
sp_int32 ninstances = 0;
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
ninstances += iter->second->get_num_instances();
}
return ninstances == ntasks;
}
bool TMaster::ValidateStMgrsWithPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan) {
std::map<std::string, std::vector<proto::system::Instance*> > stmgr_to_instance_map;
for (sp_int32 i = 0; i < _pplan->instances_size(); ++i) {
proto::system::Instance* instance = _pplan->mutable_instances(i);
if (stmgr_to_instance_map.find(instance->stmgr_id()) == stmgr_to_instance_map.end()) {
std::vector<proto::system::Instance*> instances;
instances.push_back(instance);
stmgr_to_instance_map[instance->stmgr_id()] = instances;
} else {
stmgr_to_instance_map[instance->stmgr_id()].push_back(instance);
}
}
for (StMgrMapIter iter = stmgrs_.begin(); iter != stmgrs_.end(); ++iter) {
if (stmgr_to_instance_map.find(iter->first) == stmgr_to_instance_map.end()) {
LOG(ERROR) << "Instances info from " << iter->first
<< " is missing. Bailing out..." << std::endl;
return false;
}
if (!iter->second->VerifyInstances(stmgr_to_instance_map[iter->first])) {
LOG(ERROR) << "Instances verification failed for " << iter->first
<< ". Bailing out..." << std::endl;
return false;
}
}
return true;
}
/**
* Make sure component names exist
*/
bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) const {
LOG(INFO) << "Validating runtime configs.";
const proto::api::Topology& topology = current_pplan_->topology();
DCHECK(topology.IsInitialized());
std::unordered_set<std::string> components;
config::TopologyConfigHelper::GetAllComponentNames(topology, components);
ComponentConfigMap::const_iterator iter;
const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
if (iter->first != topology_key) {
// It is a component, search for it
if (components.find(iter->first) == components.end()) {
return false;
}
}
}
return true;
}
void TMaster::LogConfig(const ComponentConfigMap& _config) {
for (auto iter = _config.begin(); iter != _config.end(); ++iter) {
LOG(INFO) << iter->first << " =>";
for (auto i = iter->second.begin(); i != iter->second.end(); ++i) {
LOG(INFO) << i->first << " : " << i->second;
}
}
}
} // namespace tmaster
} // namespace heron