Add helper methods in Config Helper Functions that will be used by CPP Instance (#2445)

diff --git a/heron/common/src/cpp/config/physical-plan-helper.cpp b/heron/common/src/cpp/config/physical-plan-helper.cpp
index 9e611e8..91cfa5a 100644
--- a/heron/common/src/cpp/config/physical-plan-helper.cpp
+++ b/heron/common/src/cpp/config/physical-plan-helper.cpp
@@ -59,6 +59,33 @@
   return;
 }
 
+const std::string PhysicalPlanHelper::GetComponentName(const proto::system::PhysicalPlan& _pplan,
+                                                       int _task_id) {
+  for (auto instance : _pplan.instances()) {
+    if (instance.info().task_id() == _task_id) {
+      return instance.info().component_name();
+    }
+  }
+  return EMPTY_STRING;
+}
+
+void PhysicalPlanHelper::GetComponentTaskIds(const proto::system::PhysicalPlan& _pplan,
+                                             const std::string& _component,
+                                             std::list<int>& _retval) {
+  for (auto instance : _pplan.instances()) {
+    if (instance.info().component_name() == _component) {
+      _retval.push_back(instance.info().task_id());
+    }
+  }
+}
+
+void PhysicalPlanHelper::GetTaskIdToComponentName(const proto::system::PhysicalPlan& _pplan,
+                                                  std::map<int, std::string>& retval) {
+  for (auto instance : _pplan.instances()) {
+    retval[instance.info().task_id()] = instance.info().component_name();
+  }
+}
+
 void PhysicalPlanHelper::GetTasks(const proto::system::PhysicalPlan& _pplan,
                                   const sp_string& _stmgr,
                                   std::unordered_set<sp_int32>& _return) {
diff --git a/heron/common/src/cpp/config/physical-plan-helper.h b/heron/common/src/cpp/config/physical-plan-helper.h
index b7c0f6f..eefea5e 100644
--- a/heron/common/src/cpp/config/physical-plan-helper.h
+++ b/heron/common/src/cpp/config/physical-plan-helper.h
@@ -25,7 +25,9 @@
 #ifndef SVCS_COMMON_CONFIG_PHYSICAL_PLAN_HELPER_H_
 #define SVCS_COMMON_CONFIG_PHYSICAL_PLAN_HELPER_H_
 
+#include <list>
 #include <map>
+#include <string>
 #include <unordered_set>
 #include "basics/basics.h"
 #include "proto/messages.h"
@@ -64,6 +66,19 @@
                           std::unordered_set<sp_int32>& _return);
 
   static void LogPhysicalPlan(const proto::system::PhysicalPlan& _pplan);
+
+  // Returns the component name for the specified _task_id
+  // If the _task_id is not part of the _pplan, return empty string
+  static const std::string GetComponentName(const proto::system::PhysicalPlan& _pplan,
+                                            int _task_id);
+
+  // For a particular _component, returns all the task_ids
+  static void GetComponentTaskIds(const proto::system::PhysicalPlan& _pplan,
+                                  const std::string& _component, std::list<int>& _retval);
+
+  // Return a mapping from task id -> component name
+  static void GetTaskIdToComponentName(const proto::system::PhysicalPlan& _pplan,
+                                       std::map<int, std::string>& retval);
 };
 }  // namespace config
 }  // namespace heron
diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp b/heron/common/src/cpp/config/topology-config-helper.cpp
index e45afb5..6e4df52 100644
--- a/heron/common/src/cpp/config/topology-config-helper.cpp
+++ b/heron/common/src/cpp/config/topology-config-helper.cpp
@@ -69,6 +69,20 @@
   return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
 }
 
+bool TopologyConfigHelper::EnableMessageTimeouts(const proto::api::Topology& _topology) {
+  sp_string value_true_ = "true";
+  std::set<sp_string> topology_config;
+  if (_topology.has_topology_config()) {
+    const proto::api::Config& cfg = _topology.topology_config();
+    for (sp_int32 i = 0; i < cfg.kvs_size(); ++i) {
+      if (cfg.kvs(i).key() == TopologyConfigVars::TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) {
+        return value_true_.compare(cfg.kvs(i).value().c_str()) == 0;
+      }
+    }
+  }
+  return false;
+}
+
 sp_int32 TopologyConfigHelper::GetNumStMgrs(const proto::api::Topology& _topology) {
   std::set<sp_string> topology_config;
   if (_topology.has_topology_config()) {
@@ -223,6 +237,138 @@
   return max_components_per_container * 1073741824l;
 }
 
+void TopologyConfigHelper::GetComponentStreams(const proto::api::Topology& _topology,
+                                               const std::string& _component,
+                                               std::unordered_set<std::string>& retval) {
+  for (auto spout : _topology.spouts()) {
+    if (spout.comp().name() == _component) {
+      for (auto output : spout.outputs()) {
+        retval.insert(output.stream().id());
+      }
+    }
+  }
+  for (auto bolt : _topology.bolts()) {
+    if (bolt.comp().name() == _component) {
+      for (auto output : bolt.outputs()) {
+        retval.insert(output.stream().id());
+      }
+    }
+  }
+}
+
+proto::api::StreamSchema*
+TopologyConfigHelper::GetStreamSchema(proto::api::Topology& _topology,
+                                      const std::string& _component,
+                                      const std::string& _stream) {
+  for (int i = 0; i < _topology.spouts_size(); ++i) {
+    auto spout = _topology.mutable_spouts(i);
+    if (spout->comp().name() == _component) {
+      for (int j = 0; j < spout->outputs_size(); ++j) {
+        proto::api::OutputStream* output = spout->mutable_outputs(j);
+        if (output->stream().id() == _stream) {
+          return output->mutable_schema();
+        }
+      }
+      return NULL;
+    }
+  }
+  for (int i = 0; i < _topology.bolts_size(); ++i) {
+    auto bolt = _topology.mutable_bolts(i);
+    if (bolt->comp().name() == _component) {
+      for (int j = 0; j < bolt->outputs_size(); ++j) {
+        proto::api::OutputStream* output = bolt->mutable_outputs(j);
+        if (output->stream().id() == _stream) {
+          return output->mutable_schema();
+        }
+      }
+      return NULL;
+    }
+  }
+  return NULL;
+}
+
+void TopologyConfigHelper::GetComponentSources(const proto::api::Topology& _topology,
+                                               const std::string& _component,
+                                               std::map<std::pair<std::string, std::string>,
+                                                        proto::api::Grouping>& retval) {
+  // only bolts have sources
+  for (auto bolt : _topology.bolts()) {
+    if (bolt.comp().name() == _component) {
+      for (auto ins : bolt.inputs()) {
+        retval[std::make_pair(ins.stream().component_name(), ins.stream().id())] = ins.gtype();
+      }
+    }
+  }
+}
+
+void TopologyConfigHelper::GetComponentTargets(const proto::api::Topology& _topology,
+                                               const std::string& _component,
+                        std::map<std::string,
+                                 std::map<std::string, proto::api::Grouping>>& retval) {
+  // only bolts have inputs
+  for (auto bolt : _topology.bolts()) {
+    for (auto ins : bolt.inputs()) {
+      if (ins.stream().component_name() == _component) {
+        if (retval.find(_component) == retval.end()) {
+          retval[_component] = std::map<std::string, proto::api::Grouping>();
+        }
+        retval[_component][ins.stream().id()] = ins.gtype();
+      }
+    }
+  }
+}
+
+void TopologyConfigHelper::GetAllComponentNames(const proto::api::Topology& _topology,
+                                                std::unordered_set<std::string>& retval) {
+  for (auto spout : _topology.spouts()) {
+    retval.insert(spout.comp().name());
+  }
+  for (auto bolt : _topology.bolts()) {
+    retval.insert(bolt.comp().name());
+  }
+}
+
+bool TopologyConfigHelper::IsComponentSpout(const proto::api::Topology& _topology,
+                                            const std::string& _component) {
+  for (auto spout : _topology.spouts()) {
+    if (spout.comp().name() == _component) return true;
+  }
+  return false;
+}
+
+void TopologyConfigHelper::LogTopology(const proto::api::Topology& _topology) {
+  LOG(INFO) << "Printing Topology";
+  LOG(INFO) << "Topology Name: " << _topology.name();
+  LOG(INFO) << "Topology Id: " << _topology.id();
+  LOG(INFO) << "Topology State: " << _topology.state();
+  LOG(INFO) << "Topology Config:";
+  LogConfig(_topology.topology_config());
+  for (int i = 0; i < _topology.spouts_size(); ++i) {
+    LOG(INFO) << "Spout Info: ";
+    LOG(INFO) << "\tName: " << _topology.spouts(i).comp().name();
+    LOG(INFO) << "\tSpec: " << _topology.spouts(i).comp().spec();
+    LOG(INFO) << "\tConfig: ";
+    LogConfig(_topology.spouts(i).comp().config());
+  }
+  for (int i = 0; i < _topology.bolts_size(); ++i) {
+    LOG(INFO) << "Bolt Info: ";
+    LOG(INFO) << "\tName: " << _topology.bolts(i).comp().name();
+    LOG(INFO) << "\tSpec: " << _topology.bolts(i).comp().spec();
+    LOG(INFO) << "\tConfig: ";
+    LogConfig(_topology.bolts(i).comp().config());
+  }
+}
+
+void TopologyConfigHelper::LogConfig(const proto::api::Config& _config) {
+  for (int i = 0; i < _config.kvs_size(); ++i) {
+    if (_config.kvs(i).type() == proto::api::ConfigValueType::STRING_VALUE) {
+      LOG(INFO) << "\t" << _config.kvs(i).key() << ": " << _config.kvs(i).value();
+    } else {
+      LOG(INFO) << "\t" << _config.kvs(i).key();
+    }
+  }
+}
+
 bool TopologyConfigHelper::StatefulTopologyStartClean(const proto::api::Topology& _topology) {
   return GetBooleanConfigValue(_topology,
                                TopologyConfigVars::TOPOLOGY_STATEFUL_START_CLEAN, false);
diff --git a/heron/common/src/cpp/config/topology-config-helper.h b/heron/common/src/cpp/config/topology-config-helper.h
index 4048f2b..4b0f6b2 100644
--- a/heron/common/src/cpp/config/topology-config-helper.h
+++ b/heron/common/src/cpp/config/topology-config-helper.h
@@ -28,6 +28,7 @@
 #include <map>
 #include <unordered_set>
 #include <string>
+#include <utility>
 #include "basics/basics.h"
 #include "proto/messages.h"
 #include "config/topology-config-vars.h"
@@ -40,6 +41,9 @@
   static TopologyConfigVars::TopologyReliabilityMode
           GetReliabilityMode(const proto::api::Topology& _topology);
 
+  // Are message timeouts enabled for this topology
+  static bool EnableMessageTimeouts(const proto::api::Topology& _topology);
+
   // This returns the value of TOPOLOGY_STMGRS from the config
   static sp_int32 GetNumStMgrs(const proto::api::Topology& _topology);
 
@@ -78,6 +82,44 @@
   // Gets the per container ram requested by this topology
   static sp_int64 GetContainerRamRequested(const proto::api::Topology& _topology);
 
+  // Get all the streams emitted by a component
+  static void GetComponentStreams(const proto::api::Topology& _topology,
+                                  const std::string& component,
+                                  std::unordered_set<std::string>& retval);
+
+  // Get the schema of the stream of a component
+  static proto::api::StreamSchema* GetStreamSchema(proto::api::Topology& _topology,
+                                                   const std::string& _component,
+                                                   const std::string& _stream);
+
+  // Get the sources for this component. The return value is a map of <componentName, streamId> to
+  // Grouping map
+  static void GetComponentSources(const proto::api::Topology& _topology,
+                                  const std::string& _component,
+                                  std::map<std::pair<std::string, std::string>,
+                                           proto::api::Grouping>& retval);
+
+  // Get the targets for this component. The return value is a map of <componentName, streamId> to
+  // Grouping map
+  static void GetComponentTargets(const proto::api::Topology& _topology,
+                                  const std::string& _component,
+                                  std::map<std::string,
+                                           std::map<std::string, proto::api::Grouping>>& retval);
+
+  // Get all the component names
+  static void GetAllComponentNames(const proto::api::Topology& _topology,
+                                   std::unordered_set<std::string>& retval);
+
+  // Is this a spout
+  static bool IsComponentSpout(const proto::api::Topology& _topology,
+                               const std::string& _component);
+
+  // Log this topology
+  static void LogTopology(const proto::api::Topology& _topology);
+
+  // Log this config
+  static void LogConfig(const proto::api::Config& _config);
+
   // Should this stateful topology start from clean state
   static bool StatefulTopologyStartClean(const proto::api::Topology& _topology);