blob: 6510271b8b692bc80e9931c0adc2c4696b3ca3d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "manager/stmgr.h"
#include <sys/resource.h>
#include <cstdlib>
#include <iostream>
#include <limits>
#include <list>
#include <map>
#include <string>
#include <vector>
#include <utility>
#include "manager/stmgr-clientmgr.h"
#include "manager/stmgr-server.h"
#include "manager/instance-server.h"
#include "manager/stream-consumers.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "basics/mempool.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
#include "config/helper.h"
#include "statemgr/heron-statemgr.h"
#include "metrics/metrics.h"
#include "metrics/metrics-mgr-st.h"
#include "util/xor-manager.h"
#include "util/neighbour-calculator.h"
#include "manager/stateful-restorer.h"
#include "manager/tmaster-client.h"
#include "util/tuple-cache.h"
#include "manager/ckptmgr-client.h"
namespace heron {
namespace stmgr {
using std::make_shared;
// Stats for the process
const sp_string METRIC_CPU_USER = "__cpu_user_usec";
const sp_string METRIC_CPU_SYSTEM = "__cpu_system_usec";
const sp_string METRIC_UPTIME = "__uptime_sec";
const sp_string METRIC_MEM_USED = "__mem_used_bytes";
const sp_string METRIC_PROCESS = "__process";
const sp_string METRIC_RESTORE_INITIALIZED = "__restore_initiated";
const sp_string RESTORE_DROPPED_STMGR_BYTES = "__stmgr_dropped_bytes";
const sp_string RESTORE_DROPPED_INSTANCE_TUPLES = "__instance_dropped_tuples";
const sp_string METRIC_INSTANCE_BYTES_RECEIVED = "__instance_bytes_received";
const sp_string METRIC_DROPPED_DURING_RESTORE = "__dropped_during_restore";
// Time spent in back pressure because of local instances connection;
// we initiated this backpressure
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_INIT =
"__server/__time_spent_back_pressure_initiated";
const sp_int64 PROCESS_METRICS_FREQUENCY = 10_s;
const sp_int64 UPTIME_METRIC_FREQUENCY = 1_s;
const sp_int64 TMASTER_RETRY_FREQUENCY = 10_s;
StMgr::StMgr(shared_ptr<EventLoop> eventLoop, const sp_string& _myhost, sp_int32 _data_port,
sp_int32 _local_data_port,
const sp_string& _topology_name, const sp_string& _topology_id,
shared_ptr<proto::api::Topology> _hydrated_topology, const sp_string& _stmgr_id,
const std::vector<sp_string>& _instances, const sp_string& _zkhostport,
const sp_string& _zkroot, sp_int32 _metricsmgr_port, sp_int32 _shell_port,
sp_int32 _ckptmgr_port, const sp_string& _ckptmgr_id,
sp_int64 _high_watermark, sp_int64 _low_watermark,
const sp_string& _metricscachemgr_mode)
: topology_name_(_topology_name),
topology_id_(_topology_id),
stmgr_id_(_stmgr_id),
stmgr_host_(_myhost),
data_port_(_data_port),
local_data_port_(_local_data_port),
instances_(_instances),
eventLoop_(eventLoop),
hydrated_topology_(_hydrated_topology),
start_time_(std::chrono::high_resolution_clock::now()),
zkhostport_(_zkhostport),
zkroot_(_zkroot),
metricsmgr_port_(_metricsmgr_port),
shell_port_(_shell_port),
ckptmgr_port_(_ckptmgr_port),
ckptmgr_id_(_ckptmgr_id),
high_watermark_(_high_watermark),
low_watermark_(_low_watermark),
metricscachemgr_mode_(_metricscachemgr_mode) {}
void StMgr::Init() {
LOG(INFO) << "Init Stmgr" << std::endl;
sp_int32 metrics_export_interval_sec =
config::HeronInternalsConfigReader::Instance()->GetHeronMetricsExportIntervalSec();
__global_protobuf_pool_set_pool_max_number_of_messages__(
heron::config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrMempoolMaxMessageNumber());
state_mgr_ = heron::common::HeronStateMgr::MakeStateMgr(zkhostport_, zkroot_, eventLoop_, false);
metrics_manager_client_ = make_shared<heron::common::MetricsMgrSt>(
metricsmgr_port_, metrics_export_interval_sec, eventLoop_);
stmgr_process_metrics_ = make_shared<heron::common::MultiAssignableMetric>();
metrics_manager_client_->register_metric(METRIC_PROCESS, stmgr_process_metrics_);
restore_initiated_metrics_ = make_shared<heron::common::CountMetric>();
metrics_manager_client_->register_metric(METRIC_RESTORE_INITIALIZED, restore_initiated_metrics_);
dropped_during_restore_metrics_ = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric(METRIC_DROPPED_DURING_RESTORE,
dropped_during_restore_metrics_);
instance_bytes_received_metrics_ = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric(METRIC_INSTANCE_BYTES_RECEIVED,
instance_bytes_received_metrics_);
back_pressure_metric_initiated_ = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT,
back_pressure_metric_initiated_);
state_mgr_->SetTMasterLocationWatch(topology_name_, [this]() { this->FetchTMasterLocation(); });
if (0 != metricscachemgr_mode_.compare("disabled")) {
state_mgr_->SetMetricsCacheLocationWatch(
topology_name_, [this]() { this->FetchMetricsCacheLocation(); });
}
reliability_mode_ = heron::config::TopologyConfigHelper::GetReliabilityMode(*hydrated_topology_);
// create and start ckptmgr-client, stateful-restorer when receive pplan
ckptmgr_client_ = nullptr;
stateful_restorer_ = nullptr;
// Create the client manager
clientmgr_ = make_shared<StMgrClientMgr>(eventLoop_, topology_name_, topology_id_, stmgr_id_,
this, metrics_manager_client_, high_watermark_, low_watermark_,
config::TopologyConfigHelper::DropTuplesUponBackpressure(*hydrated_topology_));
// Create and Register Tuple cache
CreateTupleCache();
CHECK_GT(
eventLoop_->registerTimer(
[this](EventLoop::Status status) { this->CheckTMasterLocation(status); }, false,
config::HeronInternalsConfigReader::Instance()->GetCheckTMasterLocationIntervalSec() *
1_s),
0); // fire only once
// Instantiate neighbour calculator. Required by stmgr server
neighbour_calculator_ = make_shared<NeighbourCalculator>();
// Create and start StmgrServer. The actual stmgr server port is assgined.
StartStmgrServer();
// Create and start InstanceServer
StartInstanceServer();
// FetchTMasterLocation() triggers the StMgr::CreateTMasterClient() where the TMasterClient
// constructor needs actual stmgr ports, thus put FetchTMasterLocation()
// has to be after after StartStmgrServer and StartInstanceServer()
FetchTMasterLocation();
if (0 != metricscachemgr_mode_.compare("disabled")) {
FetchMetricsCacheLocation();
}
// Check for log pruning every 5 minutes
CHECK_GT(eventLoop_->registerTimer(
[](EventLoop::Status) { ::heron::common::PruneLogs(); }, true,
config::HeronInternalsConfigReader::Instance()->GetHeronLoggingPruneIntervalSec() *
1_s),
0);
// Check for log flushing every 10 seconds
CHECK_GT(eventLoop_->registerTimer(
[](EventLoop::Status) { ::heron::common::FlushLogs(); }, true,
config::HeronInternalsConfigReader::Instance()->GetHeronLoggingFlushIntervalSec() *
1_s),
0);
// Update uptime metric every 1 second
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateUptimeMetric();
}, true, UPTIME_METRIC_FREQUENCY), 0);
// Update Process related metrics every 10 seconds
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateProcessMetrics(status);
}, true, PROCESS_METRICS_FREQUENCY), 0);
is_acking_enabled =
reliability_mode_ == config::TopologyConfigVars::TopologyReliabilityMode::ATLEAST_ONCE;
}
StMgr::~StMgr() {
metrics_manager_client_->unregister_metric(METRIC_PROCESS);
metrics_manager_client_->unregister_metric(METRIC_RESTORE_INITIALIZED);
metrics_manager_client_->unregister_metric(METRIC_DROPPED_DURING_RESTORE);
metrics_manager_client_->unregister_metric(METRIC_INSTANCE_BYTES_RECEIVED);
metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_INIT);
CleanupStreamConsumers();
CleanupXorManagers();
}
bool StMgr::DidAnnounceBackPressure() {
return stmgr_server_->DidAnnounceBackPressure()
|| instance_server_->DidAnnounceBackPressure();
}
bool StMgr::DidOthersAnnounceBackPressure() {
return stmgr_server_->DidOthersAnnounceBackPressure();
}
const NetworkOptions& StMgr::GetStmgrServerNetworkOptions() const {
return stmgr_server_->get_serveroptions();
}
const NetworkOptions& StMgr::GetInstanceServerNetworkOptions() const {
return instance_server_->get_serveroptions();
}
void StMgr::CheckTMasterLocation(EventLoop::Status) {
if (!tmaster_client_) {
LOG(FATAL) << "Could not fetch the TMaster location in time. Exiting. ";
}
}
void StMgr::UpdateUptimeMetric() {
auto end_time = std::chrono::high_resolution_clock::now();
auto uptime = std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time_).count();
stmgr_process_metrics_->scope(METRIC_UPTIME)->SetValue(uptime);
}
void StMgr::UpdateProcessMetrics(EventLoop::Status) {
// CPU
struct rusage usage;
ProcessUtils::getResourceUsage(&usage);
stmgr_process_metrics_->scope(METRIC_CPU_USER)
->SetValue((usage.ru_utime.tv_sec * 1_s) + usage.ru_utime.tv_usec);
stmgr_process_metrics_->scope(METRIC_CPU_SYSTEM)
->SetValue((usage.ru_stime.tv_sec * 1_s) + usage.ru_stime.tv_usec);
// Memory
size_t totalmemory = ProcessUtils::getTotalMemoryUsed();
stmgr_process_metrics_->scope(METRIC_MEM_USED)->SetValue(totalmemory);
}
void StMgr::FetchTMasterLocation() {
LOG(INFO) << "Fetching TMaster Location";
auto tmaster = make_shared<proto::tmaster::TMasterLocation>();
auto cb = [tmaster, this](proto::system::StatusCode status) {
this->OnTMasterLocationFetch(tmaster, status);
};
state_mgr_->GetTMasterLocation(topology_name_, tmaster, std::move(cb));
}
void StMgr::FetchMetricsCacheLocation() {
LOG(INFO) << "Fetching MetricsCache Location";
auto metricscache = make_shared<proto::tmaster::MetricsCacheLocation>();
auto cb = [metricscache, this](proto::system::StatusCode status) {
this->OnMetricsCacheLocationFetch(metricscache, status);
};
state_mgr_->GetMetricsCacheLocation(topology_name_, metricscache, std::move(cb));
}
void StMgr::StartStmgrServer() {
CHECK(!stmgr_server_);
LOG(INFO) << "Creating StmgrServer";
NetworkOptions sops;
sops.set_host(IpUtils::getHostName());
sops.set_port(data_port_);
sops.set_socket_family(PF_INET);
sops.set_max_packet_size(
config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrNetworkOptionsMaximumPacketMb() *
1_MB);
sops.set_high_watermark(high_watermark_);
sops.set_low_watermark(low_watermark_);
stmgr_server_ = make_shared<StMgrServer>(eventLoop_, sops, topology_name_, topology_id_,
stmgr_id_, this, metrics_manager_client_);
// start the server
CHECK_EQ(stmgr_server_->Start(), 0);
data_port_ = stmgr_server_->get_serveroptions().get_port();
}
void StMgr::StartInstanceServer() {
CHECK(!instance_server_);
LOG(INFO) << "Creating InstanceServer";
NetworkOptions sops;
sops.set_host(IpUtils::getHostName());
sops.set_port(local_data_port_);
sops.set_socket_family(PF_INET);
sops.set_max_packet_size(
config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrNetworkOptionsMaximumPacketMb() *
1_MB);
sops.set_high_watermark(high_watermark_);
sops.set_low_watermark(low_watermark_);
instance_server_ = make_shared<InstanceServer>(eventLoop_, sops, topology_name_, topology_id_,
stmgr_id_, instances_, this, metrics_manager_client_,
neighbour_calculator_,
config::TopologyConfigHelper::DropTuplesUponBackpressure(*hydrated_topology_));
// start the server
CHECK_EQ(instance_server_->Start(), 0);
local_data_port_ = instance_server_->get_serveroptions().get_port();
// metrics_manager_client_ picks the actual local_data_port_ and starts
metrics_manager_client_->Start(stmgr_host_, local_data_port_, "__stmgr__", stmgr_id_);
}
void StMgr::CreateCheckpointMgrClient() {
LOG(INFO) << "Creating CheckpointMgr Client at " << stmgr_host_ << ":" << ckptmgr_port_;
NetworkOptions client_options;
client_options.set_host("127.0.0.1");
client_options.set_port(ckptmgr_port_);
client_options.set_socket_family(PF_INET);
client_options.set_max_packet_size(std::numeric_limits<sp_uint32>::max() - 1);
auto save_watcher = std::bind(&StMgr::HandleSavedInstanceState, this,
std::placeholders::_1, std::placeholders::_2);
auto get_watcher = std::bind(&StMgr::HandleGetInstanceState, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4);
auto ckpt_watcher = std::bind(&StMgr::HandleCkptMgrRegistration, this);
ckptmgr_client_ = make_shared<CkptMgrClient>(eventLoop_, client_options,
topology_name_, topology_id_,
ckptmgr_id_, stmgr_id_,
save_watcher, get_watcher, ckpt_watcher);
}
void StMgr::CreateTMasterClient(shared_ptr<proto::tmaster::TMasterLocation> tmasterLocation) {
CHECK(!tmaster_client_);
LOG(INFO) << "Creating Tmaster Client at " << tmasterLocation->host() << ":"
<< tmasterLocation->master_port();
NetworkOptions master_options;
master_options.set_host(tmasterLocation->host());
master_options.set_port(tmasterLocation->master_port());
master_options.set_socket_family(PF_INET);
master_options.set_max_packet_size(
config::HeronInternalsConfigReader::Instance()
->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
1_MB);
master_options.set_high_watermark(high_watermark_);
master_options.set_low_watermark(low_watermark_);
auto pplan_watch = [this](shared_ptr<proto::system::PhysicalPlan> pplan) {
this->NewPhysicalPlan(pplan);
};
auto stateful_checkpoint_watch =
[this](sp_string checkpoint_id) {
this->InitiateStatefulCheckpoint(checkpoint_id);
};
auto restore_topology_watch =
[this](sp_string checkpoint_id, sp_int64 restore_txid) {
this->RestoreTopologyState(checkpoint_id, restore_txid);
};
auto start_stateful_watch =
[this](sp_string checkpoint_id) {
this->StartStatefulProcessing(checkpoint_id);
};
tmaster_client_ = make_shared<TMasterClient>(eventLoop_, master_options, stmgr_id_, stmgr_host_,
data_port_, local_data_port_, shell_port_,
std::move(pplan_watch),
std::move(stateful_checkpoint_watch),
std::move(restore_topology_watch),
std::move(start_stateful_watch));
}
void StMgr::CreateTupleCache() {
CHECK(!tuple_cache_);
LOG(INFO) << "Creating tuple cache ";
sp_uint32 drain_threshold_bytes_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrCacheDrainSizeMb() * 1_MB;
tuple_cache_ = make_shared<TupleCache>(eventLoop_, drain_threshold_bytes_);
tuple_cache_->RegisterDrainer(&StMgr::DrainInstanceData, this);
tuple_cache_->RegisterCheckpointDrainer(&StMgr::DrainDownstreamCheckpoint, this);
}
void StMgr::HandleNewTmaster(shared_ptr<proto::tmaster::TMasterLocation> newTmasterLocation) {
// Lets delete the existing tmaster if we have one.
if (tmaster_client_) {
LOG(INFO) << "Destroying existing tmasterClient";
tmaster_client_->Die();
tmaster_client_ = NULL;
}
// Create the tmaster and the servers/clients but don't start the tmaster
// connection as yet. We'll do that once we connect to all the instances.
CreateTMasterClient(newTmasterLocation);
// In the case where we are doing a tmaster refresh we may have already
// connected to all of the instances
if (instance_server_ && instance_server_->HaveAllInstancesConnectedToUs()) {
StartTMasterClient();
}
}
void StMgr::BroadcastTmasterLocation(shared_ptr<proto::tmaster::TMasterLocation> tmasterLocation) {
// Notify metrics manager of the tmaster location changes
// TODO(vikasr): What if the refresh fails?
metrics_manager_client_->RefreshTMasterLocation(*tmasterLocation);
}
void StMgr::BroadcastMetricsCacheLocation(
shared_ptr<proto::tmaster::MetricsCacheLocation> tmasterLocation) {
// Notify metrics manager of the metricscache location changes
// TODO(huijun): What if the refresh fails?
LOG(INFO) << "BroadcastMetricsCacheLocation";
metrics_manager_client_->RefreshMetricsCacheLocation(*tmasterLocation);
}
void StMgr::OnTMasterLocationFetch(shared_ptr<proto::tmaster::TMasterLocation> newTmasterLocation,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(INFO) << "TMaster Location Fetch failed with status " << _status;
LOG(INFO) << "Retrying after " << TMASTER_RETRY_FREQUENCY << " micro seconds ";
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status) {
this->FetchTMasterLocation();
}, false, TMASTER_RETRY_FREQUENCY), 0);
} else {
// We got a new tmaster location.
// Just verify that we are talking to the right entity
if (newTmasterLocation->topology_name() != topology_name_ ||
newTmasterLocation->topology_id() != topology_id_) {
LOG(FATAL) << "Topology name/id mismatch between stmgr and TMaster "
<< "We expected " << topology_name_ << " : " << topology_id_ << " but tmaster had "
<< newTmasterLocation->topology_name() << " : "
<< newTmasterLocation->topology_id();
}
LOG(INFO) << "Fetched TMasterLocation to be " << newTmasterLocation->host() << ":"
<< newTmasterLocation->master_port();
bool isNewTmaster = true;
if (tmaster_client_) {
sp_string currentTmasterHostPort = tmaster_client_->getTmasterHostPort();
std::string newTmasterHostPort =
newTmasterLocation->host() + ":" + std::to_string(newTmasterLocation->master_port());
if (currentTmasterHostPort == newTmasterHostPort) {
LOG(INFO) << "New tmaster location same as the current one. "
<< "Nothing to do here... ";
isNewTmaster = false;
} else {
LOG(INFO) << "New tmaster location different from the current one."
<< " Current one at " << currentTmasterHostPort << " and New one at "
<< newTmasterHostPort;
isNewTmaster = true;
}
}
if (isNewTmaster) {
HandleNewTmaster(newTmasterLocation);
}
// Stmgr doesn't know what other things might have changed, so it is important
// to broadcast the location, even though we know its the same tmaster.
BroadcastTmasterLocation(newTmasterLocation);
}
}
void StMgr::OnMetricsCacheLocationFetch(
shared_ptr<proto::tmaster::MetricsCacheLocation> newTmasterLocation,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(INFO) << "MetricsCache Location Fetch failed with status " << _status;
LOG(INFO) << "Retrying after " << TMASTER_RETRY_FREQUENCY << " micro seconds ";
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status) {
this->FetchMetricsCacheLocation();
}, false, TMASTER_RETRY_FREQUENCY), 0);
} else {
// We got a new metricscache location.
// Just verify that we are talking to the right entity
if (newTmasterLocation->topology_name() != topology_name_ ||
newTmasterLocation->topology_id() != topology_id_) {
LOG(FATAL) << "Topology name/id mismatch between stmgr and MetricsCache "
<< "We expected " << topology_name_ << " : " << topology_id_
<< " but MetricsCache had "
<< newTmasterLocation->topology_name() << " : "
<< newTmasterLocation->topology_id() << std::endl;
}
LOG(INFO) << "Fetched MetricsCacheLocation to be " << newTmasterLocation->host() << ":"
<< newTmasterLocation->master_port();
// Stmgr doesn't know what other things might have changed, so it is important
// to broadcast the location, even though we know its the same metricscache.
BroadcastMetricsCacheLocation(newTmasterLocation);
}
}
// Start the tmaster client
void StMgr::StartTMasterClient() {
if (!tmaster_client_) {
LOG(INFO) << "We haven't received tmaster location yet"
<< ", so tmaster_client_ hasn't been created"
<< "Once we get the location, it will be started";
// Nothing else to do here
} else {
std::vector<proto::system::Instance*> all_instance_info;
instance_server_->GetInstanceInfo(all_instance_info);
tmaster_client_->SetInstanceInfo(all_instance_info);
if (!tmaster_client_->IsConnected()) {
LOG(INFO) << "Connecting to the TMaster as all the instances have connected to us";
tmaster_client_->Start();
}
}
}
void StMgr::NewPhysicalPlan(shared_ptr<proto::system::PhysicalPlan> _pplan) {
LOG(INFO) << "Received a new physical plan from tmaster";
heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
// first make sure that we are part of the plan ;)
bool found = false;
for (sp_int32 i = 0; i < _pplan->stmgrs_size(); ++i) {
if (_pplan->stmgrs(i).id() == stmgr_id_) {
found = true;
break;
}
}
if (!found) {
LOG(FATAL) << "We have no role in this topology!!";
}
// The Topology structure here is not hydrated.
// We need to hydrate it now.
// Its possible that the topology's state might have changed.
// So we need to handle it seperately.
if (!pplan_) {
LOG(INFO) << "This is the first time we received the physical plan";
} else if (_pplan->topology().state() != pplan_->topology().state()) {
LOG(INFO) << "Topology state changed from " << pplan_->topology().state() << " to "
<< _pplan->topology().state();
}
PatchPhysicalPlanWithHydratedTopology(_pplan, *hydrated_topology_);
LOG(INFO) << "Patched with hydrated topology";
heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
// TODO(vikasr) Currently we dont check if our role has changed
// Build out data structures
std::map<sp_string, std::vector<sp_int32> > component_to_task_ids;
task_id_to_stmgr_.clear();
for (sp_int32 i = 0; i < _pplan->instances_size(); ++i) {
sp_int32 task_id = _pplan->instances(i).info().task_id();
task_id_to_stmgr_[task_id] = _pplan->instances(i).stmgr_id();
const sp_string& component_name = _pplan->instances(i).info().component_name();
if (component_to_task_ids.find(component_name) == component_to_task_ids.end()) {
component_to_task_ids[component_name] = std::vector<sp_int32>();
}
component_to_task_ids[component_name].push_back(task_id);
}
if (!pplan_) {
PopulateStreamConsumers(_pplan->mutable_topology(), component_to_task_ids);
PopulateXorManagers(_pplan->topology(), ExtractTopologyTimeout(_pplan->topology()),
component_to_task_ids);
}
pplan_ = _pplan;
neighbour_calculator_->Reconstruct(*pplan_);
// For effectively once topologies, we only start connecting after we have recovered
// from a globally consistent checkpoint. The act of starting connections is initiated
// by the restorer
if (!stateful_restorer_) {
clientmgr_->StartConnections(*pplan_);
}
instance_server_->BroadcastNewPhysicalPlan(*pplan_);
// create ckptmgr-client and stateful-restorer after receiving pplan
if (reliability_mode_ == config::TopologyConfigVars::EFFECTIVELY_ONCE
&& ckptmgr_client_ == nullptr && stateful_restorer_ == nullptr) {
CreateCheckpointMgrClient();
stateful_restorer_ = make_shared<StatefulRestorer>(ckptmgr_client_, clientmgr_,
tuple_cache_, instance_server_, metrics_manager_client_,
std::bind(&StMgr::HandleStatefulRestoreDone, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
ckptmgr_client_->SetPhysicalPlan(*pplan_);
ckptmgr_client_->Start();
}
}
void StMgr::CleanupStreamConsumers() {
stream_consumers_.clear();
}
void StMgr::CleanupXorManagers() {
xor_mgrs_.reset();
}
sp_int32 StMgr::ExtractTopologyTimeout(const proto::api::Topology& _topology) {
for (sp_int32 i = 0; i < _topology.topology_config().kvs_size(); ++i) {
if (_topology.topology_config().kvs(i).key() == "topology.message.timeout.secs") {
return atoi(_topology.topology_config().kvs(i).value().c_str());
}
}
LOG(FATAL) << "topology.message.timeout.secs does not exist";
return 0;
}
void StMgr::PopulateStreamConsumers(
proto::api::Topology* _topology,
const std::map<sp_string, std::vector<sp_int32> >& _component_to_task_ids) {
// First get a map of <component, stream> -> Schema
std::map<std::pair<sp_string, sp_string>, proto::api::StreamSchema*> schema_map;
for (sp_int32 i = 0; i < _topology->spouts_size(); ++i) {
for (sp_int32 j = 0; j < _topology->spouts(i).outputs_size(); ++j) {
proto::api::OutputStream* os = _topology->mutable_spouts(i)->mutable_outputs(j);
std::pair<sp_string, sp_string> p =
make_pair(os->stream().component_name(), os->stream().id());
schema_map[p] = os->mutable_schema();
}
}
for (sp_int32 i = 0; i < _topology->bolts_size(); ++i) {
for (sp_int32 j = 0; j < _topology->bolts(i).outputs_size(); ++j) {
proto::api::OutputStream* os = _topology->mutable_bolts(i)->mutable_outputs(j);
std::pair<sp_string, sp_string> p =
make_pair(os->stream().component_name(), os->stream().id());
schema_map[p] = os->mutable_schema();
}
}
// Only bolts can consume
for (sp_int32 i = 0; i < _topology->bolts_size(); ++i) {
for (sp_int32 j = 0; j < _topology->bolts(i).inputs_size(); ++j) {
const proto::api::InputStream& is = _topology->bolts(i).inputs(j);
std::pair<sp_string, sp_string> p = make_pair(is.stream().component_name(), is.stream().id());
proto::api::StreamSchema* schema = schema_map[p];
const sp_string& component_name = _topology->bolts(i).comp().name();
auto iter = _component_to_task_ids.find(component_name);
CHECK(iter != _component_to_task_ids.end());
const std::vector<sp_int32>& component_task_ids = iter->second;
if (stream_consumers_.find(p) == stream_consumers_.end()) {
stream_consumers_[p] = make_unique<StreamConsumers>(is, *schema, component_task_ids);
} else {
stream_consumers_[p]->NewConsumer(is, *schema, component_task_ids);
}
}
}
}
void StMgr::PopulateXorManagers(
const proto::api::Topology& _topology, sp_int32 _message_timeout,
const std::map<sp_string, std::vector<sp_int32> >& _component_to_task_ids) {
// Only spouts need xor maintainance
// TODO(vikasr) Do only for the spouts that we have.
std::vector<sp_int32> all_spout_tasks;
for (sp_int32 i = 0; i < _topology.spouts_size(); ++i) {
for (sp_int32 j = 0; j < _topology.spouts(i).outputs_size(); ++j) {
const proto::api::OutputStream os = _topology.spouts(i).outputs(j);
const std::vector<sp_int32>& component_task_ids =
_component_to_task_ids.find(os.stream().component_name())->second;
all_spout_tasks.insert(all_spout_tasks.end(), component_task_ids.begin(),
component_task_ids.end());
}
}
xor_mgrs_ = make_shared<XorManager>(eventLoop_, _message_timeout, all_spout_tasks);
}
const shared_ptr<proto::system::PhysicalPlan> StMgr::GetPhysicalPlan() const { return pplan_; }
void StMgr::HandleStreamManagerData(const sp_string&,
pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
if (stateful_restorer_ && stateful_restorer_->InProgress()) {
LOG(INFO) << "Dropping data received from stmgr because we are in Restore";
dropped_during_restore_metrics_->scope(RESTORE_DROPPED_STMGR_BYTES)
->incr_by(_message->set().size());
return;
}
// We received message from another stream manager
sp_int32 _task_id = _message->task_id();
// We have a shortcut for non-acking case
if (!is_acking_enabled) {
instance_server_->SendToInstance2(std::move(_message));
} else {
proto::system::HeronTupleSet2* tuple_set = nullptr;
tuple_set = __global_protobuf_pool_acquire__(tuple_set);
tuple_set->ParsePartialFromString(_message->set());
SendInBound(_task_id, tuple_set);
}
}
void StMgr::SendInBound(sp_int32 _task_id, proto::system::HeronTupleSet2* _message) {
if (_message->has_data()) {
instance_server_->SendToInstance2(_task_id, _message);
}
if (_message->has_control()) {
// We got a bunch of acks/fails
ProcessAcksAndFails(_message->src_task_id(), _task_id, _message->control());
__global_protobuf_pool_release__(_message);
}
}
void StMgr::ProcessAcksAndFails(sp_int32 _src_task_id, sp_int32 _task_id,
const proto::system::HeronControlTupleSet& _control) {
proto::system::HeronTupleSet2* current_control_tuple_set = nullptr;
current_control_tuple_set = __global_protobuf_pool_acquire__(current_control_tuple_set);
current_control_tuple_set->set_src_task_id(_src_task_id);
// First go over emits. This makes sure that new emits makes
// a tuples stay alive before we process its acks
for (sp_int32 i = 0; i < _control.emits_size(); ++i) {
const proto::system::AckTuple& ack_tuple = _control.emits(i);
for (sp_int32 j = 0; j < ack_tuple.roots_size(); ++j) {
CHECK_EQ(_task_id, ack_tuple.roots(j).taskid());
CHECK(!xor_mgrs_->anchor(_task_id, ack_tuple.roots(j).key(), ack_tuple.ackedtuple()));
}
}
// Then go over acks
for (sp_int32 i = 0; i < _control.acks_size(); ++i) {
const proto::system::AckTuple& ack_tuple = _control.acks(i);
for (sp_int32 j = 0; j < ack_tuple.roots_size(); ++j) {
CHECK_EQ(_task_id, ack_tuple.roots(j).taskid());
if (xor_mgrs_->anchor(_task_id, ack_tuple.roots(j).key(), ack_tuple.ackedtuple())) {
// This tuple tree is all over
proto::system::AckTuple* a;
a = current_control_tuple_set->mutable_control()->add_acks();
proto::system::RootId* r = a->add_roots();
r->set_key(ack_tuple.roots(j).key());
r->set_taskid(_task_id);
a->set_ackedtuple(0); // this is ignored
CHECK(xor_mgrs_->remove(_task_id, ack_tuple.roots(j).key()));
}
}
}
// Now go over the fails
for (sp_int32 i = 0; i < _control.fails_size(); ++i) {
const proto::system::AckTuple& fail_tuple = _control.fails(i);
for (sp_int32 j = 0; j < fail_tuple.roots_size(); ++j) {
CHECK_EQ(_task_id, fail_tuple.roots(j).taskid());
if (xor_mgrs_->remove(_task_id, fail_tuple.roots(j).key())) {
// This tuple tree is failed
proto::system::AckTuple* f;
f = current_control_tuple_set->mutable_control()->add_fails();
proto::system::RootId* r = f->add_roots();
r->set_key(fail_tuple.roots(j).key());
r->set_taskid(_task_id);
f->set_ackedtuple(0); // this is ignored
}
}
}
// Check if we need to send this out
if (current_control_tuple_set->has_control()) {
instance_server_->SendToInstance2(_task_id, current_control_tuple_set);
} else {
__global_protobuf_pool_release__(current_control_tuple_set);
}
}
// Called when local tasks generate data
void StMgr::HandleInstanceData(const sp_int32 _src_task_id, bool _local_spout,
pool_unique_ptr<proto::system::HeronTupleSet> _message) {
instance_bytes_received_metrics_->scope(std::to_string(_src_task_id))
->incr_by(_message->ByteSize());
if (stateful_restorer_ && stateful_restorer_->InProgress()) {
LOG(INFO) << "Dropping data received from instance " << _src_task_id
<< " because we are in Restore";
dropped_during_restore_metrics_->scope(RESTORE_DROPPED_INSTANCE_TUPLES)
->incr_by(_message->data().tuples_size());
return;
}
// Note:- Process data before control
// This is to make sure that anchored emits are sent out
// before any acks/fails
if (_message->has_data()) {
proto::system::HeronDataTupleSet* d = _message->mutable_data();
std::pair<sp_string, sp_string> stream =
make_pair(d->stream().component_name(), d->stream().id());
auto s = stream_consumers_.find(stream);
if (s != stream_consumers_.end()) {
StreamConsumers& s_consumer = *(s->second);
for (sp_int32 i = 0; i < d->tuples_size(); ++i) {
proto::system::HeronDataTuple* _tuple = d->mutable_tuples(i);
// just to make sure that instances do not set any key
CHECK_EQ(_tuple->key(), 0);
out_tasks_.clear();
s_consumer.GetListToSend(*_tuple, out_tasks_);
// In addition to out_tasks_, the instance might have asked
// us to send the tuple to some more tasks
for (sp_int32 j = 0; j < _tuple->dest_task_ids_size(); ++j) {
out_tasks_.push_back(_tuple->dest_task_ids(j));
}
if (out_tasks_.empty()) {
LOG(ERROR) << "Nobody to send the tuple to";
}
// TODO(vikasr) Do a fast path that does not involve copying
CopyDataOutBound(_src_task_id, _local_spout, d->stream(), _tuple, out_tasks_);
}
} else {
LOG(ERROR) << "Nobody consumes stream " << stream.second << " from component "
<< stream.first;
}
}
if (_message->has_control()) {
proto::system::HeronControlTupleSet* c = _message->mutable_control();
CHECK_EQ(c->emits_size(), 0);
for (sp_int32 i = 0; i < c->acks_size(); ++i) {
CopyControlOutBound(_src_task_id, c->acks(i), false);
}
for (sp_int32 i = 0; i < c->fails_size(); ++i) {
CopyControlOutBound(_src_task_id, c->fails(i), true);
}
}
}
// Called to drain cached instance data
void StMgr::DrainInstanceData(sp_int32 _task_id, proto::system::HeronTupleSet2* _tuple) {
const sp_string& dest_stmgr_id = task_id_to_stmgr_[_task_id];
if (dest_stmgr_id == stmgr_id_) {
// Our own loopback
SendInBound(_task_id, _tuple);
} else {
bool dropped = !(clientmgr_->SendTupleStreamMessage(_task_id, dest_stmgr_id, *_tuple));
if (dropped && stateful_restorer_ && !stateful_restorer_->InProgress()) {
LOG(INFO) << "We dropped some messages because we are not yet connected with stmgr "
<< dest_stmgr_id << " and we are not in restore. Hence sending Reset "
<< "message to TMaster";
tmaster_client_->SendResetTopologyState("", _task_id, "Dropped Instance Tuples");
restore_initiated_metrics_->incr();
}
__global_protobuf_pool_release__(_tuple);
}
}
void StMgr::CopyControlOutBound(sp_int32 _src_task_id,
const proto::system::AckTuple& _control, bool _is_fail) {
for (sp_int32 i = 0; i < _control.roots_size(); ++i) {
proto::system::AckTuple t;
t.add_roots()->CopyFrom(_control.roots(i));
t.set_ackedtuple(_control.ackedtuple());
if (!_is_fail) {
tuple_cache_->add_ack_tuple(_src_task_id, _control.roots(i).taskid(), t);
} else {
tuple_cache_->add_fail_tuple(_src_task_id, _control.roots(i).taskid(), t);
}
}
}
void StMgr::CopyDataOutBound(sp_int32 _src_task_id, bool _local_spout,
const proto::api::StreamId& _streamid,
proto::system::HeronDataTuple* _tuple,
const std::vector<sp_int32>& _out_tasks) {
bool first_iteration = true;
for (auto& i : _out_tasks) {
sp_int64 tuple_key = tuple_cache_->add_data_tuple(_src_task_id, i, _streamid, _tuple);
if (_tuple->roots_size() > 0) {
// Anchored tuple
if (_local_spout) {
// This is a local spout. We need to maintain xors
CHECK_EQ(_tuple->roots_size(), 1);
if (first_iteration) {
xor_mgrs_->create(_src_task_id, _tuple->roots(0).key(), tuple_key);
} else {
CHECK(!xor_mgrs_->anchor(_src_task_id, _tuple->roots(0).key(), tuple_key));
}
} else {
// Anchored emits from local bolt
for (sp_int32 i = 0; i < _tuple->roots_size(); ++i) {
proto::system::AckTuple t;
t.add_roots()->CopyFrom(_tuple->roots(i));
t.set_ackedtuple(tuple_key);
tuple_cache_->add_emit_tuple(_src_task_id, _tuple->roots(i).taskid(), t);
}
}
}
first_iteration = false;
}
}
void StMgr::StartBackPressureOnServer(const sp_string& _other_stmgr_id) {
// Ask the StMgrServer to stop consuming. The client does
// not consume anything
stmgr_server_->StartBackPressureClientCb(_other_stmgr_id);
}
void StMgr::StartBackPressureOnSpouts() {
instance_server_->StartBackPressureOnSpouts();
}
void StMgr::StopBackPressureOnServer(const sp_string& _other_stmgr_id) {
// Call the StMgrServers removeBackPressure method
stmgr_server_->StopBackPressureClientCb(_other_stmgr_id);
}
void StMgr::AttemptStopBackPressureFromSpouts() {
instance_server_->AttemptStopBackPressureFromSpouts();
}
void StMgr::SendStartBackPressureToOtherStMgrs() {
clientmgr_->SendStartBackPressureToOtherStMgrs();
back_pressure_metric_initiated_->Start();
}
void StMgr::SendStopBackPressureToOtherStMgrs() {
clientmgr_->SendStopBackPressureToOtherStMgrs();
back_pressure_metric_initiated_->Stop();
}
// Do any actions if a stmgr client connection dies
void StMgr::HandleDeadStMgrConnection(const sp_string& _stmgr_id) {
// If we are stateful topology, we need to send a resetTopology message
// in case we are not in 2pc
if (stateful_restorer_) {
if (!stateful_restorer_->InProgress() && tmaster_client_) {
LOG(INFO) << "We lost connection with stmgr " << _stmgr_id
<< " and hence sending ResetTopology message to tmaster";
tmaster_client_->SendResetTopologyState(_stmgr_id, -1, "Dead Stmgr");
restore_initiated_metrics_->incr();
} else {
// We are in restore
stateful_restorer_->HandleDeadStMgrConnection();
}
}
}
void StMgr::HandleAllStMgrClientsRegistered() {
// If we are stateful topology, we might want to continue our restore process
if (stateful_restorer_) {
stateful_restorer_->HandleAllStMgrClientsConnected();
}
}
void StMgr::HandleAllInstancesConnected() {
// StmgrServer told us that all instances are connected to us
if (stateful_restorer_) {
if (stateful_restorer_->InProgress()) {
// We are in the middle of a restore
stateful_restorer_->HandleAllInstancesConnected();
} else if (tmaster_client_ && tmaster_client_->IsConnected()) {
LOG(INFO) << "We are already connected to tmaster(which means we are not in"
<< " initial startup), and we are not in the middle of restore."
<< " This means that while running normally, some instances"
<< " got reconnected to us and thus we might have lost some tuples in middle"
<< " We must reset the topology";
tmaster_client_->SendResetTopologyState("", -1, "All Instances connected");
restore_initiated_metrics_->incr();
} else {
// This is the first time we came up when we haven't even connected to tmaster
// Now that all instances are connected to us, we should connect to tmaster
StartTMasterClient();
}
} else {
// Now we can connect to the tmaster
StartTMasterClient();
}
}
void StMgr::HandleDeadInstance(sp_int32 _task_id) {
if (stateful_restorer_) {
if (stateful_restorer_->InProgress()) {
stateful_restorer_->HandleDeadInstanceConnection(_task_id);
} else {
LOG(INFO) << "An instance " << _task_id << " died while we are not "
<< "in restore. Sending ResetMessage to tmaster";
tmaster_client_->SendResetTopologyState("", _task_id, "Dead Instance");
restore_initiated_metrics_->incr();
}
}
}
// Invoked by the CheckpointMgr Client when it gets registered to
// the ckptmgr.
void StMgr::HandleCkptMgrRegistration() {
if (stateful_restorer_) {
stateful_restorer_->HandleCkptMgrRestart();
}
}
void StMgr::InitiateStatefulCheckpoint(sp_string _checkpoint_id) {
// Initiate the process of stateful checkpointing by sending a request
// to checkpoint to all local spouts
instance_server_->InitiateStatefulCheckpoint(_checkpoint_id);
}
// We just received a InstanceStateCheckpoint message from one of our instances
// We need to propagate it to all downstream tasks
// We also need to send the checkpoint to ckptmgr
void StMgr::HandleStoreInstanceStateCheckpoint(
const proto::ckptmgr::InstanceStateCheckpoint& _message,
const proto::system::Instance& _instance) {
CHECK(stateful_restorer_);
int32_t task_id = _instance.info().task_id();
LOG(INFO) << "Got a checkpoint state message from " << task_id
<< " for checkpoint " << _message.checkpoint_id();
if (stateful_restorer_->InProgress()) {
LOG(INFO) << "Ignoring the message because we are in progress";
return;
}
std::unordered_set<sp_int32> downstream_receivers =
neighbour_calculator_->get_downstreamers(task_id);
for (auto downstream_receiver : downstream_receivers) {
LOG(INFO) << "Adding a DownstreamCheckpointMessage triplet "
<< _message.checkpoint_id() << " "
<< task_id << " " << downstream_receiver;
proto::ckptmgr::DownstreamStatefulCheckpoint* message =
new proto::ckptmgr::DownstreamStatefulCheckpoint();
message->set_origin_task_id(task_id);
message->set_destination_task_id(downstream_receiver);
message->set_checkpoint_id(_message.checkpoint_id());
tuple_cache_->add_checkpoint_tuple(downstream_receiver, message);
}
// save the checkpoint
auto message = make_unique<proto::ckptmgr::SaveInstanceStateRequest>();
message->mutable_instance()->CopyFrom(_instance);
message->mutable_checkpoint()->CopyFrom(_message);
ckptmgr_client_->SaveInstanceState(std::move(message));
}
// Invoked by CheckpointMgr Client when it finds out that the ckptmgr
// saved the state of an instance
void StMgr::HandleSavedInstanceState(const proto::system::Instance& _instance,
const std::string& _checkpoint_id) {
LOG(INFO) << "Got notification from ckptmgr that we saved instance state for task "
<< _instance.info().task_id() << " for checkpoint "
<< _checkpoint_id;
tmaster_client_->SavedInstanceState(_instance, _checkpoint_id);
}
// Invoked by CheckpointMgr Client when it retreives the state of an instance
void StMgr::HandleGetInstanceState(proto::system::StatusCode _status, sp_int32 _task_id,
sp_string _checkpoint_id,
const proto::ckptmgr::InstanceStateCheckpoint& _msg) {
if (stateful_restorer_) {
stateful_restorer_->HandleCheckpointState(_status, _task_id, _checkpoint_id, _msg);
}
}
// Send checkpoint message to this task_id
void StMgr::DrainDownstreamCheckpoint(sp_int32 _task_id,
proto::ckptmgr::DownstreamStatefulCheckpoint* _message) {
sp_string stmgr = task_id_to_stmgr_[_task_id];
if (stmgr == stmgr_id_) {
HandleDownStreamStatefulCheckpoint(*_message);
delete _message;
} else {
clientmgr_->SendDownstreamStatefulCheckpoint(stmgr, _message);
}
}
void StMgr::HandleDownStreamStatefulCheckpoint(
const proto::ckptmgr::DownstreamStatefulCheckpoint& _message) {
instance_server_->HandleCheckpointMarker(_message.origin_task_id(),
_message.destination_task_id(),
_message.checkpoint_id());
}
// Called by TmasterClient when it receives directive from tmaster
// to restore the topology to _checkpoint_id checkpoint
void StMgr::RestoreTopologyState(sp_string _checkpoint_id, sp_int64 _restore_txid) {
LOG(INFO) << "Got a Restore Topology State message from Tmaster for checkpoint "
<< _checkpoint_id << " and txid " << _restore_txid;
CHECK(stateful_restorer_);
// Start the restore process
std::unordered_set<sp_int32> local_taskids;
config::PhysicalPlanHelper::GetTasks(*pplan_, stmgr_id_, local_taskids),
stateful_restorer_->StartRestore(_checkpoint_id, _restore_txid, local_taskids, *pplan_);
}
// Called by TmasterClient when it receives directive from tmaster
// to start processing after having previously recovered the state at _checkpoint_id
void StMgr::StartStatefulProcessing(sp_string _checkpoint_id) {
LOG(INFO) << "Received StartProcessing message from tmaster for "
<< _checkpoint_id;
CHECK(stateful_restorer_);
if (stateful_restorer_->InProgress()) {
LOG(FATAL) << "StartProcessing received from Tmaster for "
<< _checkpoint_id << " when we are still in Restore";
}
instance_server_->SendStartInstanceStatefulProcessing(_checkpoint_id);
}
void StMgr::HandleRestoreInstanceStateResponse(sp_int32 _task_id,
const proto::system::Status& _status,
const std::string& _checkpoint_id) {
// If we are stateful topology, we might want to see how the restore went
// and if it was successful and all other local instances have recovered
// send back a success response to tmaster saying that we have recovered
CHECK(stateful_restorer_);
stateful_restorer_->HandleInstanceRestoredState(_task_id, _status.status(), _checkpoint_id);
}
// Called after we have recovered our state(either successfully or unsuccessfully)
// We need to let our tmaster know
void StMgr::HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64 _restore_txid) {
tmaster_client_->SendRestoreTopologyStateResponse(_status, _checkpoint_id, _restore_txid);
}
// Patch new physical plan with internal hydrated topology but keep new topology data:
// - new topology state
// - new topology/component config
void StMgr::PatchPhysicalPlanWithHydratedTopology(shared_ptr<proto::system::PhysicalPlan> _pplan,
proto::api::Topology const& _topology) {
// Back up new topology data (state and configs)
proto::api::TopologyState st = _pplan->topology().state();
std::map<std::string, std::string> topology_config;
config::TopologyConfigHelper::GetTopologyRuntimeConfig(_pplan->topology(), topology_config);
std::unordered_set<std::string> components;
std::map<std::string, std::map<std::string, std::string>> component_config;
config::TopologyConfigHelper::GetAllComponentNames(_pplan->topology(), components);
for (auto iter = components.begin(); iter != components.end(); ++iter) {
std::map<std::string, std::string> config;
config::TopologyConfigHelper::GetComponentRuntimeConfig(_pplan->topology(), *iter, config);
component_config[*iter] = config;
}
// Copy hydrated topology into pplan
_pplan->clear_topology();
_pplan->mutable_topology()->CopyFrom(_topology);
// Restore new topology data
_pplan->mutable_topology()->set_state(st);
config::TopologyConfigHelper::SetTopologyRuntimeConfig(_pplan->mutable_topology(),
topology_config);
for (auto iter = components.begin(); iter != components.end(); ++iter) {
config::TopologyConfigHelper::SetComponentRuntimeConfig(_pplan->mutable_topology(), *iter,
component_config[*iter]);
}
}
} // namespace stmgr
} // namespace heron