Add helper methods in Config Helper Functions that will be used by CPP Instance (#2445)
diff --git a/heron/common/src/cpp/config/physical-plan-helper.cpp b/heron/common/src/cpp/config/physical-plan-helper.cpp
index 9e611e8..91cfa5a 100644
--- a/heron/common/src/cpp/config/physical-plan-helper.cpp
+++ b/heron/common/src/cpp/config/physical-plan-helper.cpp
@@ -59,6 +59,33 @@
return;
}
+const std::string PhysicalPlanHelper::GetComponentName(const proto::system::PhysicalPlan& _pplan,
+ int _task_id) {
+ for (auto instance : _pplan.instances()) {
+ if (instance.info().task_id() == _task_id) {
+ return instance.info().component_name();
+ }
+ }
+ return EMPTY_STRING;
+}
+
+void PhysicalPlanHelper::GetComponentTaskIds(const proto::system::PhysicalPlan& _pplan,
+ const std::string& _component,
+ std::list<int>& _retval) {
+ for (auto instance : _pplan.instances()) {
+ if (instance.info().component_name() == _component) {
+ _retval.push_back(instance.info().task_id());
+ }
+ }
+}
+
+void PhysicalPlanHelper::GetTaskIdToComponentName(const proto::system::PhysicalPlan& _pplan,
+ std::map<int, std::string>& retval) {
+ for (auto instance : _pplan.instances()) {
+ retval[instance.info().task_id()] = instance.info().component_name();
+ }
+}
+
void PhysicalPlanHelper::GetTasks(const proto::system::PhysicalPlan& _pplan,
const sp_string& _stmgr,
std::unordered_set<sp_int32>& _return) {
diff --git a/heron/common/src/cpp/config/physical-plan-helper.h b/heron/common/src/cpp/config/physical-plan-helper.h
index b7c0f6f..eefea5e 100644
--- a/heron/common/src/cpp/config/physical-plan-helper.h
+++ b/heron/common/src/cpp/config/physical-plan-helper.h
@@ -25,7 +25,9 @@
#ifndef SVCS_COMMON_CONFIG_PHYSICAL_PLAN_HELPER_H_
#define SVCS_COMMON_CONFIG_PHYSICAL_PLAN_HELPER_H_
+#include <list>
#include <map>
+#include <string>
#include <unordered_set>
#include "basics/basics.h"
#include "proto/messages.h"
@@ -64,6 +66,19 @@
std::unordered_set<sp_int32>& _return);
static void LogPhysicalPlan(const proto::system::PhysicalPlan& _pplan);
+
+ // Returns the component name for the specified _task_id
+ // If the _task_id is not part of the _pplan, return empty string
+ static const std::string GetComponentName(const proto::system::PhysicalPlan& _pplan,
+ int _task_id);
+
+ // For a particular _component, returns all the task_ids
+ static void GetComponentTaskIds(const proto::system::PhysicalPlan& _pplan,
+ const std::string& _component, std::list<int>& _retval);
+
+ // Return a mapping from task id -> component name
+ static void GetTaskIdToComponentName(const proto::system::PhysicalPlan& _pplan,
+ std::map<int, std::string>& retval);
};
} // namespace config
} // namespace heron
diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp b/heron/common/src/cpp/config/topology-config-helper.cpp
index e45afb5..6e4df52 100644
--- a/heron/common/src/cpp/config/topology-config-helper.cpp
+++ b/heron/common/src/cpp/config/topology-config-helper.cpp
@@ -69,6 +69,20 @@
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
}
+bool TopologyConfigHelper::EnableMessageTimeouts(const proto::api::Topology& _topology) {
+ sp_string value_true_ = "true";
+ std::set<sp_string> topology_config;
+ if (_topology.has_topology_config()) {
+ const proto::api::Config& cfg = _topology.topology_config();
+ for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
+ if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) {
+ return value_true_.compare(cfg.kvs(i).value().c_str()) == 0;
+ }
+ }
+ }
+ return false;
+}
+
sp_int32 TopologyConfigHelper::GetNumStMgrs(const proto::api::Topology& _topology) {
std::set<sp_string> topology_config;
if (_topology.has_topology_config()) {
@@ -223,6 +237,138 @@
return max_components_per_container * 1073741824l;
}
+void TopologyConfigHelper::GetComponentStreams(const proto::api::Topology& _topology,
+ const std::string& _component,
+ std::unordered_set<std::string>& retval) {
+ for (auto spout : _topology.spouts()) {
+ if (spout.comp().name() == _component) {
+ for (auto output : spout.outputs()) {
+ retval.insert(output.stream().id());
+ }
+ }
+ }
+ for (auto bolt : _topology.bolts()) {
+ if (bolt.comp().name() == _component) {
+ for (auto output : bolt.outputs()) {
+ retval.insert(output.stream().id());
+ }
+ }
+ }
+}
+
+proto::api::StreamSchema*
+TopologyConfigHelper::GetStreamSchema(proto::api::Topology& _topology,
+ const std::string& _component,
+ const std::string& _stream) {
+ for (int i = 0; i < _topology.spouts_size(); ++i) {
+ auto spout = _topology.mutable_spouts(i);
+ if (spout->comp().name() == _component) {
+ for (int j = 0; j < spout->outputs_size(); ++j) {
+ proto::api::OutputStream* output = spout->mutable_outputs(j);
+ if (output->stream().id() == _stream) {
+ return output->mutable_schema();
+ }
+ }
+ return NULL;
+ }
+ }
+ for (int i = 0; i < _topology.bolts_size(); ++i) {
+ auto bolt = _topology.mutable_bolts(i);
+ if (bolt->comp().name() == _component) {
+ for (int j = 0; j < bolt->outputs_size(); ++j) {
+ proto::api::OutputStream* output = bolt->mutable_outputs(j);
+ if (output->stream().id() == _stream) {
+ return output->mutable_schema();
+ }
+ }
+ return NULL;
+ }
+ }
+ return NULL;
+}
+
+void TopologyConfigHelper::GetComponentSources(const proto::api::Topology& _topology,
+ const std::string& _component,
+ std::map<std::pair<std::string, std::string>,
+ proto::api::Grouping>& retval) {
+ // only bolts have sources
+ for (auto bolt : _topology.bolts()) {
+ if (bolt.comp().name() == _component) {
+ for (auto ins : bolt.inputs()) {
+ retval[std::make_pair(ins.stream().component_name(), ins.stream().id())] = ins.gtype();
+ }
+ }
+ }
+}
+
+void TopologyConfigHelper::GetComponentTargets(const proto::api::Topology& _topology,
+ const std::string& _component,
+ std::map<std::string,
+ std::map<std::string, proto::api::Grouping>>& retval) {
+ // only bolts have inputs
+ for (auto bolt : _topology.bolts()) {
+ for (auto ins : bolt.inputs()) {
+ if (ins.stream().component_name() == _component) {
+ if (retval.find(_component) == retval.end()) {
+ retval[_component] = std::map<std::string, proto::api::Grouping>();
+ }
+ retval[_component][ins.stream().id()] = ins.gtype();
+ }
+ }
+ }
+}
+
+void TopologyConfigHelper::GetAllComponentNames(const proto::api::Topology& _topology,
+ std::unordered_set<std::string>& retval) {
+ for (auto spout : _topology.spouts()) {
+ retval.insert(spout.comp().name());
+ }
+ for (auto bolt : _topology.bolts()) {
+ retval.insert(bolt.comp().name());
+ }
+}
+
+bool TopologyConfigHelper::IsComponentSpout(const proto::api::Topology& _topology,
+ const std::string& _component) {
+ for (auto spout : _topology.spouts()) {
+ if (spout.comp().name() == _component) return true;
+ }
+ return false;
+}
+
+void TopologyConfigHelper::LogTopology(const proto::api::Topology& _topology) {
+ LOG(INFO) << "Printing Topology";
+ LOG(INFO) << "Topology Name: " << _topology.name();
+ LOG(INFO) << "Topology Id: " << _topology.id();
+ LOG(INFO) << "Topology State: " << _topology.state();
+ LOG(INFO) << "Topology Config:";
+ LogConfig(_topology.topology_config());
+ for (int i = 0; i < _topology.spouts_size(); ++i) {
+ LOG(INFO) << "Spout Info: ";
+ LOG(INFO) << "\tName: " << _topology.spouts(i).comp().name();
+ LOG(INFO) << "\tSpec: " << _topology.spouts(i).comp().spec();
+ LOG(INFO) << "\tConfig: ";
+ LogConfig(_topology.spouts(i).comp().config());
+ }
+ for (int i = 0; i < _topology.bolts_size(); ++i) {
+ LOG(INFO) << "Bolt Info: ";
+ LOG(INFO) << "\tName: " << _topology.bolts(i).comp().name();
+ LOG(INFO) << "\tSpec: " << _topology.bolts(i).comp().spec();
+ LOG(INFO) << "\tConfig: ";
+ LogConfig(_topology.bolts(i).comp().config());
+ }
+}
+
+void TopologyConfigHelper::LogConfig(const proto::api::Config& _config) {
+ for (int i = 0; i < _config.kvs_size(); ++i) {
+ if (_config.kvs(i).type() == proto::api::ConfigValueType::STRING_VALUE) {
+ LOG(INFO) << "\t" << _config.kvs(i).key() << ": " << _config.kvs(i).value();
+ } else {
+ LOG(INFO) << "\t" << _config.kvs(i).key();
+ }
+ }
+}
+
bool TopologyConfigHelper::StatefulTopologyStartClean(const proto::api::Topology& _topology) {
return GetBooleanConfigValue(_topology,
TopologyConfigVars::TOPOLOGY_STATEFUL_START_CLEAN, false);
diff --git a/heron/common/src/cpp/config/topology-config-helper.h b/heron/common/src/cpp/config/topology-config-helper.h
index 4048f2b..4b0f6b2 100644
--- a/heron/common/src/cpp/config/topology-config-helper.h
+++ b/heron/common/src/cpp/config/topology-config-helper.h
@@ -28,6 +28,7 @@
#include <map>
#include <unordered_set>
#include <string>
+#include <utility>
#include "basics/basics.h"
#include "proto/messages.h"
#include "config/topology-config-vars.h"
@@ -40,6 +41,9 @@
static TopologyConfigVars::TopologyReliabilityMode
GetReliabilityMode(const proto::api::Topology& _topology);
+ // Are message timeouts enabled for this topology
+ static bool EnableMessageTimeouts(const proto::api::Topology& _topology);
+
// This returns the value of TOPOLOGY_STMGRS from the config
static sp_int32 GetNumStMgrs(const proto::api::Topology& _topology);
@@ -78,6 +82,44 @@
// Gets the per container ram requested by this topology
static sp_int64 GetContainerRamRequested(const proto::api::Topology& _topology);
+ // Get all the streams emitted by a component
+ static void GetComponentStreams(const proto::api::Topology& _topology,
+ const std::string& component,
+ std::unordered_set<std::string>& retval);
+
+ // Get the schema of the stream of a component
+ static proto::api::StreamSchema* GetStreamSchema(proto::api::Topology& _topology,
+ const std::string& _component,
+ const std::string& _stream);
+
+ // Get the sources for this component. The return value is a map of <componentName, streamId> to
+ // Grouping map
+ static void GetComponentSources(const proto::api::Topology& _topology,
+ const std::string& _component,
+ std::map<std::pair<std::string, std::string>,
+ proto::api::Grouping>& retval);
+
+ // Get the targets for this component. The return value is a map of <componentName, streamId> to
+ // Grouping map
+ static void GetComponentTargets(const proto::api::Topology& _topology,
+ const std::string& _component,
+ std::map<std::string,
+ std::map<std::string, proto::api::Grouping>>& retval);
+
+ // Get all the component names
+ static void GetAllComponentNames(const proto::api::Topology& _topology,
+ std::unordered_set<std::string>& retval);
+
+ // Is this a spout
+ static bool IsComponentSpout(const proto::api::Topology& _topology,
+ const std::string& _component);
+
+ // Log this topology
+ static void LogTopology(const proto::api::Topology& _topology);
+
+ // Log this config
+ static void LogConfig(const proto::api::Config& _config);
+
// Should this stateful topology start from clean state
static bool StatefulTopologyStartClean(const proto::api::Topology& _topology);