blob: caa2882e49b4153dad13a136efd57c2e7f06b7b4 [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config/topology-config-helper.h"
#include <map>
#include <set>
#include <string>
#include "basics/basics.h"
#include "config/operational-config-vars.h"
#include "config/topology-config-vars.h"
#include "errors/errors.h"
#include "network/network.h"
#include "proto/messages.h"
#include "threads/threads.h"
namespace heron {
namespace config {
static const char TOPOLOGY_CONFIG_KEY[] = "_topology_";
static const char RUNTIME_CONFIG_POSTFIX[] = ":runtime";
TopologyConfigVars::TopologyReliabilityMode StringToReliabilityMode(const std::string& _mode) {
if (_mode == "ATMOST_ONCE") {
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
} else if (_mode == "ATLEAST_ONCE") {
return TopologyConfigVars::TopologyReliabilityMode::ATLEAST_ONCE;
} else if (_mode == "EFFECTIVELY_ONCE") {
return TopologyConfigVars::TopologyReliabilityMode::EFFECTIVELY_ONCE;
} else {
LOG(FATAL) << "Unknown Topology Reliability Mode " << _mode;
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
}
}
TopologyConfigVars::TopologyReliabilityMode
TopologyConfigHelper::GetReliabilityMode(const proto::api::Topology& _topology) {
static const std::string value_true_ = "true";
if (_topology.has_topology_config()) {
const proto::api::Config& cfg = _topology.topology_config();
// First search for reliabiliy mode
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_RELIABILITY_MODE) {
return StringToReliabilityMode(cfg.kvs(i).value());
}
}
// Nothing was found wrt reliability mode.
// The following is strictly for backwards compat
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_ENABLE_ACKING) {
if (value_true_.compare(cfg.kvs(i).value().c_str()) == 0) {
return TopologyConfigVars::TopologyReliabilityMode::ATLEAST_ONCE;
} else {
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
}
}
}
}
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
}
bool TopologyConfigHelper::EnableMessageTimeouts(const proto::api::Topology& _topology) {
static const std::string value_true_ = "true";
if (_topology.has_topology_config()) {
const proto::api::Config& cfg = _topology.topology_config();
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) {
return value_true_.compare(cfg.kvs(i).value().c_str()) == 0;
}
}
}
return false;
}
sp_int32 TopologyConfigHelper::GetNumStMgrs(const proto::api::Topology& _topology) {
if (_topology.has_topology_config()) {
const proto::api::Config& cfg = _topology.topology_config();
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_STMGRS) {
return atoi(cfg.kvs(i).value().c_str());
}
}
}
LOG(ERROR) << "No TOPOLOGY_STMGRS defined" << std::endl;
::exit(1);
return -1; // keep compiler happy
}
sp_int32 TopologyConfigHelper::GetComponentParallelism(const proto::api::Config& _config) {
std::string parallelism = GetConfigValue(_config,
TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM, "");
if (!parallelism.empty()) {
return atoi(parallelism.c_str());
}
CHECK(false) << "Topology config - no component parallelism hints";
return -1; // keep compiler happy
}
sp_int32 TopologyConfigHelper::GetComponentParallelism(
const proto::api::Topology& _topology, const std::string& _component) {
std::string parallelism = GetComponentConfigValue(_topology, _component,
TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM, "");
if (!parallelism.empty()) {
return atoi(parallelism.c_str());
}
CHECK(false) << "Topology config - no component parallelism hints";
return -1; // keep compiler happy
}
sp_int32 TopologyConfigHelper::GetTotalParallelism(const proto::api::Topology& _topology) {
sp_int32 nworkers = 0;
for (int i = 0; i < _topology.spouts_size(); ++i) {
const proto::api::Config& spout_config = _topology.spouts(i).comp().config();
nworkers += GetComponentParallelism(spout_config);
}
for (int i = 0; i < _topology.bolts_size(); ++i) {
const proto::api::Config& bolt_config = _topology.bolts(i).comp().config();
nworkers += GetComponentParallelism(bolt_config);
}
return nworkers;
}
void TopologyConfigHelper::GetComponentParallelismMap(const proto::api::Topology& _topology,
std::map<std::string, sp_int32>& pmap) {
for (int i = 0; i < _topology.spouts_size(); ++i) {
const proto::api::Config& spout_config = _topology.spouts(i).comp().config();
pmap[_topology.spouts(i).comp().name()] = GetComponentParallelism(spout_config);
}
for (int i = 0; i < _topology.bolts_size(); ++i) {
const proto::api::Config& bolt_config = _topology.bolts(i).comp().config();
pmap[_topology.bolts(i).comp().name()] = GetComponentParallelism(bolt_config);
}
}
void TopologyConfigHelper::SetComponentParallelism(proto::api::Config* _config,
sp_int32 _parallelism) {
proto::api::Config::KeyValue* kv = _config->add_kvs();
kv->set_key(TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM);
kv->set_value(std::to_string(_parallelism));
}
sp_string TopologyConfigHelper::GetWorkerChildOpts(const proto::api::Topology& _topology) {
const proto::api::Config& cfg = _topology.topology_config();
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_WORKER_CHILDOPTS) {
return cfg.kvs(i).value();
}
}
return "";
}
sp_string TopologyConfigHelper::GetTopologyReleaseOverrides(const proto::api::Topology& _topology) {
const proto::api::Config& cfg = _topology.topology_config();
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == OperationalConfigVars::TOPOLOGY_RELEASE_OVERRIDES) {
return cfg.kvs(i).value();
}
}
return "";
}
bool TopologyConfigHelper::IsTopologySane(const proto::api::Topology& _topology) {
std::set<std::string> component_names;
for (sp_int32 i = 0; i < _topology.spouts_size(); ++i) {
if (component_names.find(_topology.spouts(i).comp().name()) != component_names.end()) {
LOG(ERROR) << "Component names are not unique " << _topology.spouts(i).comp().name() << "\n";
return false;
}
component_names.insert(_topology.spouts(i).comp().name());
}
for (sp_int32 i = 0; i < _topology.bolts_size(); ++i) {
if (component_names.find(_topology.bolts(i).comp().name()) != component_names.end()) {
LOG(ERROR) << "Component names are not unique " << _topology.bolts(i).comp().name() << "\n";
return false;
}
component_names.insert(_topology.bolts(i).comp().name());
}
sp_int32 total_parallelism = GetTotalParallelism(_topology);
if (GetNumStMgrs(_topology) > total_parallelism) {
LOG(ERROR) << "Number of stmgrs are greater than "
<< "total parallelism"
<< "\n";
return false;
}
return true;
}
proto::api::Topology* TopologyConfigHelper::StripComponentObjects(
const proto::api::Topology& _topology) {
proto::api::Topology* ret = new proto::api::Topology();
ret->CopyFrom(_topology);
for (sp_int32 i = 0; i < ret->spouts_size(); ++i) {
if (ret->mutable_spouts(i)->mutable_comp()->has_serialized_object()) {
ret->mutable_spouts(i)->mutable_comp()->clear_serialized_object();
}
}
for (sp_int32 i = 0; i < ret->bolts_size(); ++i) {
if (ret->mutable_bolts(i)->mutable_comp()->has_serialized_object()) {
ret->mutable_bolts(i)->mutable_comp()->clear_serialized_object();
}
}
return ret;
}
sp_double64 TopologyConfigHelper::GetContainerCpuRequested(const proto::api::Topology& _topology) {
const proto::api::Config& cfg = _topology.topology_config();
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_CONTAINER_CPU_REQUESTED) {
return atof(cfg.kvs(i).value().c_str());
}
}
// Hmmm.. There was no value specified. The default is to allocate one cpu
// per component on a stmgr
sp_int32 total_parallelism = TopologyConfigHelper::GetTotalParallelism(_topology);
sp_int32 nstmgrs = TopologyConfigHelper::GetNumStMgrs(_topology);
return (total_parallelism / nstmgrs) + (total_parallelism % nstmgrs);
}
sp_int64 TopologyConfigHelper::GetContainerRamRequested(const proto::api::Topology& _topology) {
const proto::api::Config& cfg = _topology.topology_config();
for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_CONTAINER_RAM_REQUESTED) {
return atol(cfg.kvs(i).value().c_str());
}
}
// Hmmm.. There was no value specified. The default is to allocate 1G
// per component on a stmgr
sp_int32 total_parallelism = TopologyConfigHelper::GetTotalParallelism(_topology);
sp_int32 nstmgrs = TopologyConfigHelper::GetNumStMgrs(_topology);
sp_int64 max_components_per_container =
(total_parallelism / nstmgrs) + (total_parallelism % nstmgrs);
return max_components_per_container * 1073741824l;
}
void TopologyConfigHelper::GetComponentStreams(const proto::api::Topology& _topology,
const std::string& _component,
std::unordered_set<std::string>& retval) {
for (auto spout : _topology.spouts()) {
if (spout.comp().name() == _component) {
for (auto output : spout.outputs()) {
retval.insert(output.stream().id());
}
}
}
for (auto bolt : _topology.bolts()) {
if (bolt.comp().name() == _component) {
for (auto output : bolt.outputs()) {
retval.insert(output.stream().id());
}
}
}
}
proto::api::StreamSchema*
TopologyConfigHelper::GetStreamSchema(proto::api::Topology& _topology,
const std::string& _component,
const std::string& _stream) {
for (int i = 0; i < _topology.spouts_size(); ++i) {
auto spout = _topology.mutable_spouts(i);
if (spout->comp().name() == _component) {
for (int j = 0; j < spout->outputs_size(); ++j) {
proto::api::OutputStream* output = spout->mutable_outputs(j);
if (output->stream().id() == _stream) {
return output->mutable_schema();
}
}
return NULL;
}
}
for (int i = 0; i < _topology.bolts_size(); ++i) {
auto bolt = _topology.mutable_bolts(i);
if (bolt->comp().name() == _component) {
for (int j = 0; j < bolt->outputs_size(); ++j) {
proto::api::OutputStream* output = bolt->mutable_outputs(j);
if (output->stream().id() == _stream) {
return output->mutable_schema();
}
}
return NULL;
}
}
return NULL;
}
void TopologyConfigHelper::GetComponentSources(const proto::api::Topology& _topology,
const std::string& _component,
std::map<std::pair<std::string, std::string>,
proto::api::Grouping>& retval) {
// only bolts have sources
for (auto bolt : _topology.bolts()) {
if (bolt.comp().name() == _component) {
for (auto ins : bolt.inputs()) {
retval[std::make_pair(ins.stream().component_name(), ins.stream().id())] = ins.gtype();
}
}
}
}
void TopologyConfigHelper::GetComponentTargets(const proto::api::Topology& _topology,
const std::string& _component,
std::map<std::string,
std::map<std::string, proto::api::Grouping>>& retval) {
// only bolts have inputs
for (auto bolt : _topology.bolts()) {
for (auto ins : bolt.inputs()) {
if (ins.stream().component_name() == _component) {
if (retval.find(_component) == retval.end()) {
retval[_component] = std::map<std::string, proto::api::Grouping>();
}
retval[_component][ins.stream().id()] = ins.gtype();
}
}
}
}
void TopologyConfigHelper::GetAllComponentNames(const proto::api::Topology& _topology,
std::unordered_set<std::string>& retval) {
for (auto spout : _topology.spouts()) {
retval.insert(spout.comp().name());
}
for (auto bolt : _topology.bolts()) {
retval.insert(bolt.comp().name());
}
}
bool TopologyConfigHelper::IsComponentSpout(const proto::api::Topology& _topology,
const std::string& _component) {
for (auto spout : _topology.spouts()) {
if (spout.comp().name() == _component) return true;
}
return false;
}
void TopologyConfigHelper::LogTopology(const proto::api::Topology& _topology) {
LOG(INFO) << "Printing Topology";
LOG(INFO) << "Topology Name: " << _topology.name();
LOG(INFO) << "Topology Id: " << _topology.id();
LOG(INFO) << "Topology State: " << _topology.state();
LOG(INFO) << "Topology Config:";
LogConfig(_topology.topology_config());
for (int i = 0; i < _topology.spouts_size(); ++i) {
LOG(INFO) << "Spout Info: ";
LOG(INFO) << "\tName: " << _topology.spouts(i).comp().name();
LOG(INFO) << "\tSpec: " << _topology.spouts(i).comp().spec();
LOG(INFO) << "\tConfig: ";
LogConfig(_topology.spouts(i).comp().config());
}
for (int i = 0; i < _topology.bolts_size(); ++i) {
LOG(INFO) << "Bolt Info: ";
LOG(INFO) << "\tName: " << _topology.bolts(i).comp().name();
LOG(INFO) << "\tSpec: " << _topology.bolts(i).comp().spec();
LOG(INFO) << "\tConfig: ";
LogConfig(_topology.bolts(i).comp().config());
}
}
void TopologyConfigHelper::LogConfig(const proto::api::Config& _config) {
for (int i = 0; i < _config.kvs_size(); ++i) {
if (_config.kvs(i).type() == proto::api::ConfigValueType::STRING_VALUE) {
LOG(INFO) << "\t" << _config.kvs(i).key() << ": " << _config.kvs(i).value();
} else {
LOG(INFO) << "\t" << _config.kvs(i).key();
}
}
}
bool TopologyConfigHelper::StatefulTopologyStartClean(const proto::api::Topology& _topology) {
return GetBooleanConfigValue(_topology,
TopologyConfigVars::TOPOLOGY_STATEFUL_START_CLEAN, false);
}
sp_int64 TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(
const proto::api::Topology& _topology,
sp_int64 _default) {
const proto::api::Config& cfg = _topology.topology_config();
const std::string value = GetConfigValue(cfg,
TopologyConfigVars::TOPOLOGY_STATEFUL_CHECKPOINT_INTERVAL_SECONDS, "");
if (!value.empty()) {
return atol(value.c_str());
}
// There was no value specified. Return the default
return _default;
}
void TopologyConfigHelper::GetSpoutComponentNames(const proto::api::Topology& _topology,
std::unordered_set<std::string>& spouts) {
for (int i = 0; i < _topology.spouts_size(); ++i) {
spouts.insert(_topology.spouts(i).comp().name());
}
}
bool TopologyConfigHelper::DropTuplesUponBackpressure(const proto::api::Topology& _topology) {
return GetBooleanConfigValue(_topology,
TopologyConfigVars::TOPOLOGY_DROPTUPLES_UPON_BACKPRESSURE, false);
}
std::string TopologyConfigHelper::GetRuntimeConfigKey(const std::string& _key) {
return _key + RUNTIME_CONFIG_POSTFIX;
}
// Convert configs in map to runtime configs (append runtime postfix)
void TopologyConfigHelper::ConvertToRuntimeConfigs(
const std::map<std::string, std::string>& _origin,
std::map<std::string, std::string>& _retval) {
std::map<std::string, std::string>::const_iterator it;
for (it = _origin.begin(); it != _origin.end(); ++it) {
_retval[GetRuntimeConfigKey(it->first)] = it->second;
}
}
// Return topology level runtime config
// Note that all runtime configs are pure string so there is no need to worry about serialized_value
void TopologyConfigHelper::GetTopologyRuntimeConfig(
const proto::api::Topology& _topology,
std::map<std::string, std::string>& retval) {
if (_topology.has_topology_config()) {
const proto::api::Config& config = _topology.topology_config();
ConvertRuntimeConfigToKVMap(config, retval);
}
}
// Update topology level runtime config
// Note that all runtime configs are pure string so there is no need to worry about serialized_value
void TopologyConfigHelper::SetTopologyRuntimeConfig(
proto::api::Topology* _topology,
const std::map<std::string, std::string>& _update) {
if (_topology->has_topology_config()) {
proto::api::Config* config = _topology->mutable_topology_config();
UpdateRuntimeConfigFromKVMap(config, _update);
}
}
// Return component level runtime config
// Note that all runtime configs are pure string so there is no need to worry about serialized_value
void TopologyConfigHelper::GetComponentRuntimeConfig(
const proto::api::Topology& _topology,
const std::string& _component_name,
std::map<std::string, std::string>& retval) {
// We are assuming component names are unique and returning the config
// of the first spout or bolt found with the name.
for (sp_int32 i = 0; i < _topology.spouts_size(); ++i) {
if (_topology.spouts(i).comp().name() == _component_name) {
const proto::api::Config& config = _topology.spouts(i).comp().config();
ConvertRuntimeConfigToKVMap(config, retval);
return;
}
}
for (sp_int32 i = 0; i < _topology.bolts_size(); ++i) {
if (_topology.bolts(i).comp().name() == _component_name) {
const proto::api::Config& config = _topology.bolts(i).comp().config();
ConvertRuntimeConfigToKVMap(config, retval);
return;
}
}
}
// Update component level runtime config
// Note that all runtime configs are pure string so there is no need to worry about serialized_value
void TopologyConfigHelper::SetComponentRuntimeConfig(
proto::api::Topology* _topology,
const std::string& _component_name,
const std::map<std::string, std::string>& _update) {
// We are assuming component names are unique and updating config for all instances
// with the specific component name.
for (sp_int32 i = 0; i < _topology->spouts_size(); ++i) {
proto::api::Component* comp = _topology->mutable_spouts(i)->mutable_comp();
if (comp->name() == _component_name) {
proto::api::Config* config = comp->mutable_config();
UpdateRuntimeConfigFromKVMap(config, _update);
}
}
for (sp_int32 i = 0; i < _topology->bolts_size(); ++i) {
proto::api::Component* comp = _topology->mutable_bolts(i)->mutable_comp();
if (comp->name() == _component_name) {
proto::api::Config* config = comp->mutable_config();
UpdateRuntimeConfigFromKVMap(config, _update);
}
}
}
// For every existing config, update the value; for every non-existing config, add it.
void TopologyConfigHelper::UpdateRuntimeConfigFromKVMap(proto::api::Config* _config,
const std::map<std::string, std::string>& _kv_map) {
std::set<std::string> updated;
for (sp_int32 i = 0; i < _config->kvs_size(); ++i) {
// Runtime config has a postfix
const std::string key = _config->mutable_kvs(i)->key();
const std::map<std::string, std::string>::const_iterator it = _kv_map.find(key);
if (it != _kv_map.end()) { // If the runtime config exists, update value
_config->mutable_kvs(i)->set_value(it->second);
updated.insert(key);
}
}
// After existing configs are updated, append the newly added ones
std::map<std::string, std::string>::const_iterator it;
for (it = _kv_map.begin(); it != _kv_map.end(); ++it) {
if (updated.find(it->first) == updated.end()) {
proto::api::Config::KeyValue* kv = _config->add_kvs();
kv->set_key(it->first);
kv->set_value(it->second);
}
}
}
const std::string TopologyConfigHelper::GetConfigValue(const proto::api::Config& _config,
const std::string& _key,
const std::string& _default) {
for (sp_int32 i = 0; i < _config.kvs_size(); ++i) {
if (_config.kvs(i).key() == _key) {
return _config.kvs(i).value();
}
}
return _default;
}
bool TopologyConfigHelper::GetBooleanConfigValue(const proto::api::Topology& _topology,
const std::string& _config_name,
bool _default_value) {
static const std::string value_true_ = "true";
const std::string value = GetTopologyConfigValue(_topology, _config_name, "");
return value_true_.compare(value.c_str()) == 0;
}
// Convert topology config to a key value map
void TopologyConfigHelper::ConvertRuntimeConfigToKVMap(
const proto::api::Config& _config,
std::map<std::string, std::string>& retval) {
for (sp_int32 i = 0; i < _config.kvs_size(); ++i) {
const std::string& key = _config.kvs(i).key();
// Assuming RUNTIME_CONFIG_POSTFIX is unique and doesn't happen in normal cases. When it is
// found in a config key, the config is considered a runtime config and extracted into the
// result
if (key.find(RUNTIME_CONFIG_POSTFIX) != std::string::npos) {
retval[_config.kvs(i).key()] = _config.kvs(i).value();
}
}
}
const std::string TopologyConfigHelper::GetTopologyConfigValue(
const proto::api::Topology& _topology,
const std::string& _key,
const std::string& _default) {
const proto::api::Config& cfg = _topology.topology_config();
return GetConfigValue(cfg, _key, _default);
}
const std::string TopologyConfigHelper::GetComponentConfigValue(
const proto::api::Topology& _topology,
const std::string& _component,
const std::string& _key,
const std::string& _default) {
for (auto spout : _topology.spouts()) {
if (spout.comp().name() == _component) { // Found the component
const proto::api::Config& config = spout.comp().config();
return GetConfigValue(config, _key, _default);
}
}
for (auto bolt : _topology.bolts()) { // Found the component
if (bolt.comp().name() == _component) {
const proto::api::Config& config = bolt.comp().config();
return GetConfigValue(config, _key, _default);
}
}
return _default;
}
sp_int64 TopologyConfigHelper::GetComponentOutputBPS(const proto::api::Topology& _topology,
const std::string& _component) {
const std::string init_value = GetComponentConfigValue(_topology, _component,
TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS, "");
const std::string value = GetComponentConfigValue(_topology, _component,
GetRuntimeConfigKey(TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS), init_value);
if (!value.empty()) {
return atol(value.c_str());
}
return -1; // default to -1 (no rate limit)
}
const char* TopologyConfigHelper::GetReservedTopologyConfigKey() {
return TOPOLOGY_CONFIG_KEY;
}
} // namespace config
} // namespace heron