Make stmgr and tmaster use gflags. This removes the ugly positional arguments (#2490)
* Make stmgr use gflags. This removes the ugly positional arguments
* Convert tmaster from using positional arguments to using gflags
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index e7aeb4a..f6baa29 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -464,19 +464,19 @@
retval = {}
tmaster_cmd = [
self.tmaster_binary,
- self.master_host,
- self.master_port,
- self.tmaster_controller_port,
- self.tmaster_stats_port,
- self.topology_name,
- self.topology_id,
- self.state_manager_connection,
- self.state_manager_root,
- self.heron_internals_config_file,
- self.override_config_file,
- self.metrics_sinks_config_file,
- self.metrics_manager_port,
- self.checkpoint_manager_port]
+ '--topology_name=%s' % self.topology_name,
+ '--topology_id=%s' % self.topology_id,
+ '--zkhostportlist=%s' % self.state_manager_connection,
+ '--zkroot=%s' % self.state_manager_root,
+ '--myhost=%s' % self.master_host,
+ '--master_port=%s' % str(self.master_port),
+ '--controller_port=%s' % str(self.tmaster_controller_port),
+ '--stats_port=%s' % str(self.tmaster_stats_port),
+ '--config_file=%s' % self.heron_internals_config_file,
+ '--override_config_file=%s' % self.override_config_file,
+ '--metrics_sinks_yaml=%s' % self.metrics_sinks_config_file,
+ '--metricsmgr_port=%s' % str(self.metrics_manager_port),
+ '--ckptmgr_port=%s' % str(self.checkpoint_manager_port)]
retval["heron-tmaster"] = tmaster_cmd
retval["heron-metricscache"] = self._get_metrics_cache_cmd()
@@ -652,22 +652,22 @@
stmgr_cmd = [
self.stmgr_binary,
- self.topology_name,
- self.topology_id,
- self.topology_defn_file,
- self.state_manager_connection,
- self.state_manager_root,
- self.stmgr_ids[self.shard],
- ','.join(map(lambda x: x[0], instance_info)),
- self.master_host,
- self.master_port,
- self.tmaster_controller_port,
- self.metrics_manager_port,
- self.shell_port,
- self.heron_internals_config_file,
- self.override_config_file,
- self.checkpoint_manager_port,
- self.ckptmgr_ids[self.shard]]
+ '--topology_name=%s' % self.topology_name,
+ '--topology_id=%s' % self.topology_id,
+ '--topologydefn_file=%s' % self.topology_defn_file,
+ '--zkhostportlist=%s' % self.state_manager_connection,
+ '--zkroot=%s' % self.state_manager_root,
+ '--stmgr_id=%s' % self.stmgr_ids[self.shard],
+ '--instance_ids=%s' % ','.join(map(lambda x: x[0], instance_info)),
+ '--myhost=%s' % self.master_host,
+ '--data_port=%s' % str(self.master_port),
+ '--local_data_port=%s' % str(self.tmaster_controller_port),
+ '--metricsmgr_port=%s' % str(self.metrics_manager_port),
+ '--shell_port=%s' % str(self.shell_port),
+ '--config_file=%s' % self.heron_internals_config_file,
+ '--override_config_file=%s' % self.override_config_file,
+ '--ckptmgr_port=%s' % str(self.checkpoint_manager_port),
+ '--ckptmgr_id=%s' % self.ckptmgr_ids[self.shard]]
retval[self.stmgr_ids[self.shard]] = stmgr_cmd
# metricsmgr_metrics_sink_config_file = 'metrics_sinks.yaml'
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index 7c2f379..3b359e1 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -147,11 +147,13 @@
MockPOpen.set_next_pid(37)
expected_processes_container_0 = [
ProcessInfo(MockPOpen(), 'heron-tmaster',
- 'tmaster_binary %s master_port '
- 'tmaster_controller_port tmaster_stats_port '
- 'topname topid zknode zkroot '
- '%s %s metrics_sinks_config_file metricsmgr_port '
- 'ckptmgr-port' % (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
+ 'tmaster_binary --topology_name=topname --topology_id=topid '
+ '--zkhostportlist=zknode --zkroot=zkroot --myhost=%s --master_port=master_port '
+ '--controller_port=tmaster_controller_port --stats_port=tmaster_stats_port '
+ '--config_file=%s --override_config_file=%s '
+ '--metrics_sinks_yaml=metrics_sinks_config_file '
+ '--metricsmgr_port=metricsmgr_port '
+ '--ckptmgr_port=ckptmgr-port' % (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
ProcessInfo(MockPOpen(), 'heron-shell-0', get_expected_shell_command(0)),
ProcessInfo(MockPOpen(), 'metricsmgr-0', get_expected_metricsmgr_command(0)),
ProcessInfo(MockPOpen(), 'heron-metricscache', get_expected_metricscachemgr_command()),
@@ -161,9 +163,14 @@
MockPOpen.set_next_pid(37)
expected_processes_container_1 = [
ProcessInfo(MockPOpen(), 'stmgr-1',
- 'stmgr_binary topname topid topdefnfile zknode zkroot stmgr-1 '
- 'container_1_word_3,container_1_exclaim1_2,container_1_exclaim1_1 %s master_port '
- 'tmaster_controller_port metricsmgr_port shell-port %s %s ckptmgr-port ckptmgr-1'
+ 'stmgr_binary --topology_name=topname --topology_id=topid '
+ '--topologydefn_file=topdefnfile --zkhostportlist=zknode --zkroot=zkroot '
+ '--stmgr_id=stmgr-1 '
+ '--instance_ids=container_1_word_3,container_1_exclaim1_2,container_1_exclaim1_1 '
+ '--myhost=%s --data_port=master_port '
+ '--local_data_port=tmaster_controller_port --metricsmgr_port=metricsmgr_port '
+ '--shell_port=shell-port --config_file=%s --override_config_file=%s '
+ '--ckptmgr_port=ckptmgr-port --ckptmgr_id=ckptmgr-1'
% (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
ProcessInfo(MockPOpen(), 'container_1_word_3', get_expected_instance_command('word', 3, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_1',
@@ -180,9 +187,14 @@
ProcessInfo(MockPOpen(), 'container_7_exclaim1_210',
get_expected_instance_command('exclaim1', 210, 7)),
ProcessInfo(MockPOpen(), 'stmgr-7',
- 'stmgr_binary topname topid topdefnfile zknode zkroot stmgr-7 '
- 'container_7_word_11,container_7_exclaim1_210 %s master_port '
- 'tmaster_controller_port metricsmgr_port shell-port %s %s ckptmgr-port ckptmgr-7'
+ 'stmgr_binary --topology_name=topname --topology_id=topid '
+ '--topologydefn_file=topdefnfile --zkhostportlist=zknode --zkroot=zkroot '
+ '--stmgr_id=stmgr-7 '
+ '--instance_ids=container_7_word_11,container_7_exclaim1_210 --myhost=%s '
+ '--data_port=master_port '
+ '--local_data_port=tmaster_controller_port --metricsmgr_port=metricsmgr_port '
+ '--shell_port=shell-port --config_file=%s --override_config_file=%s '
+ '--ckptmgr_port=ckptmgr-port --ckptmgr_id=ckptmgr-7'
% (HOSTNAME, INTERNAL_CONF_PATH, OVERRIDE_PATH)),
ProcessInfo(MockPOpen(), 'metricsmgr-7', get_expected_metricsmgr_command(7)),
ProcessInfo(MockPOpen(), 'heron-shell-7', get_expected_shell_command(7)),
diff --git a/heron/stmgr/src/cpp/server/stmgr-main.cpp b/heron/stmgr/src/cpp/server/stmgr-main.cpp
index 7016487..d4703bb 100644
--- a/heron/stmgr/src/cpp/server/stmgr-main.cpp
+++ b/heron/stmgr/src/cpp/server/stmgr-main.cpp
@@ -17,6 +17,7 @@
#include <iostream>
#include <string>
#include <vector>
+#include "gflags/gflags.h"
#include "manager/stmgr.h"
#include "statemgr/heron-statemgr.h"
#include "proto/messages.h"
@@ -26,55 +27,46 @@
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
-int main(int argc, char* argv[]) {
- if (argc != 17) {
- std::cout << "Usage: " << argv[0] << " "
- << "<topname> <topid> <topdefnfile> "
- << "<zknode> <zkroot> <stmgrid> "
- << "<instanceids> <myhost> <data_port> <local_data_port> <metricsmgrport> "
- << "<shellport> <heron_internals_config_filename> <override_config_filename "
- << "<ckptmgr_port> <ckptmgr_id>"
- << std::endl;
- std::cout << "If zknode is empty please say LOCALMODE\n";
- ::exit(1);
- }
+DEFINE_string(topology_name, "", "Name of the topology");
+DEFINE_string(topology_id, "", "Id of the topology");
+DEFINE_string(topologydefn_file, "", "Name of the topology defn file");
+DEFINE_string(zkhostportlist, "", "Location of the zk");
+DEFINE_string(zkroot, "", "Root of the zk");
+DEFINE_string(stmgr_id, "", "My Id");
+DEFINE_string(instance_ids, "", "Comma seperated list of instance ids in my container");
+DEFINE_string(myhost, "", "The hostname that I'm running");
+DEFINE_int32(data_port, 0, "The port used for inter-container traffic");
+DEFINE_int32(local_data_port, 0, "The port used for intra-container traffic");
+DEFINE_int32(metricsmgr_port, 0, "The port of the local metricsmgr");
+DEFINE_int32(shell_port, 0, "The port of the local heron shell");
+DEFINE_string(config_file, "", "The heron internals config file");
+DEFINE_string(override_config_file, "", "The override heron internals config file");
+DEFINE_string(ckptmgr_id, "", "The id of the local ckptmgr");
+DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr");
- std::string topology_name = argv[1];
- std::string topology_id = argv[2];
- std::string topdefn_file = argv[3];
- std::string zkhostportlist = argv[4];
- if (zkhostportlist == "LOCALMODE") {
- zkhostportlist = "";
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (FLAGS_zkhostportlist == "LOCALMODE") {
+ FLAGS_zkhostportlist = "";
}
- std::string topdir = argv[5];
- std::string myid = argv[6];
- std::string instanceids = argv[7];
- std::vector<std::string> instances = StrUtils::split(instanceids, ",");
- std::string myhost = argv[8];
- sp_int32 data_port = atoi(argv[9]);
- sp_int32 local_data_port = atoi(argv[10]);
- sp_int32 metricsmgr_port = atoi(argv[11]);
- sp_int32 shell_port = atoi(argv[12]);
- sp_string heron_internals_config_filename = argv[13];
- sp_string override_config_filename = argv[14];
- sp_int32 ckptmgr_port = atoi(argv[15]);
- sp_string ckptmgr_id = argv[16];
+ std::vector<std::string> instances = StrUtils::split(FLAGS_instance_ids, ",");
EventLoopImpl ss;
// Read heron internals config from local file
// Create the heron-internals-config-reader to read the heron internals config
heron::config::HeronInternalsConfigReader::Create(&ss,
- heron_internals_config_filename, override_config_filename);
+ FLAGS_config_file, FLAGS_override_config_file);
- heron::common::Initialize(argv[0], myid.c_str());
+ heron::common::Initialize(argv[0], FLAGS_stmgr_id.c_str());
// Lets first read the top defn file
heron::proto::api::Topology* topology = new heron::proto::api::Topology();
- sp_string contents = FileUtils::readAll(topdefn_file);
+ sp_string contents = FileUtils::readAll(FLAGS_topologydefn_file);
topology->ParseFromString(contents);
if (!topology->IsInitialized()) {
- LOG(FATAL) << "Corrupt topology defn file" << std::endl;
+ LOG(FATAL) << "Corrupt topology defn file " << FLAGS_topologydefn_file;
}
sp_int64 high_watermark = heron::config::HeronInternalsConfigReader::Instance()
@@ -83,10 +75,11 @@
sp_int64 low_watermark = heron::config::HeronInternalsConfigReader::Instance()
->GetHeronStreammgrNetworkBackpressureLowwatermarkMb() *
1_MB;
- heron::stmgr::StMgr mgr(&ss, myhost, data_port, local_data_port, topology_name, topology_id,
- topology, myid,
- instances, zkhostportlist, topdir, metricsmgr_port, shell_port,
- ckptmgr_port, ckptmgr_id, high_watermark, low_watermark);
+ heron::stmgr::StMgr mgr(&ss, FLAGS_myhost, FLAGS_data_port, FLAGS_local_data_port,
+ FLAGS_topology_name, FLAGS_topology_id, topology, FLAGS_stmgr_id,
+ instances, FLAGS_zkhostportlist, FLAGS_zkroot, FLAGS_metricsmgr_port,
+ FLAGS_shell_port, FLAGS_ckptmgr_port, FLAGS_ckptmgr_id,
+ high_watermark, low_watermark);
mgr.Init();
ss.loop();
return 0;
diff --git a/heron/tmaster/src/cpp/server/tmaster-main.cpp b/heron/tmaster/src/cpp/server/tmaster-main.cpp
index dc07151..e951f3f 100644
--- a/heron/tmaster/src/cpp/server/tmaster-main.cpp
+++ b/heron/tmaster/src/cpp/server/tmaster-main.cpp
@@ -17,6 +17,7 @@
#include <iostream>
#include <string>
#include <vector>
+#include "gflags/gflags.h"
#include "manager/tmaster.h"
#include "proto/messages.h"
#include "basics/basics.h"
@@ -25,50 +26,45 @@
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
-int main(int argc, char* argv[]) {
- if (argc != 14) {
- std::cout << "Usage: " << argv[0] << " "
- << "<master-host> <master-port> <controller-port> <stats-port> "
- << "<topology_name> <topology_id> <zk_hostportlist> "
- << "<topdir> <heron_internals_config_filename> <override_config_filename>"
- << "<metrics_sinks_filename> <metrics-manager-port> <ckptmgr-port>" << std::endl;
- std::cout << "If zk_hostportlist is empty please say LOCALMODE\n";
- ::exit(1);
- }
+DEFINE_string(topology_name, "", "Name of the topology");
+DEFINE_string(topology_id, "", "Id of the topology");
+DEFINE_string(zkhostportlist, "", "Location of the zk");
+DEFINE_string(zkroot, "", "Root of the zk");
+DEFINE_string(myhost, "", "The hostname that I'm running");
+DEFINE_int32(master_port, 0, "The port used for communication with stmgrs");
+DEFINE_int32(controller_port, 0, "The port used to activate/deactivate");
+DEFINE_int32(stats_port, 0, "The port of the getting stats");
+DEFINE_string(config_file, "", "The heron internals config file");
+DEFINE_string(override_config_file, "", "The override heron internals config file");
+DEFINE_string(metrics_sinks_yaml, "", "The file that defines which sinks to send metrics");
+DEFINE_int32(metricsmgr_port, 0, "The port of the local metrics manager");
+DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr");
- sp_string myhost = argv[1];
- sp_int32 master_port = atoi(argv[2]);
- sp_int32 controller_port = atoi(argv[3]);
- sp_int32 stats_port = atoi(argv[4]);
- sp_string topology_name = argv[5];
- sp_string topology_id = argv[6];
- sp_string zkhostportlist = argv[7];
- if (zkhostportlist == "LOCALMODE") {
- zkhostportlist = "";
+
+int main(int argc, char* argv[]) {
+ gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (FLAGS_zkhostportlist == "LOCALMODE") {
+ FLAGS_zkhostportlist = "";
}
- sp_string topdir = argv[8];
- sp_string heron_internals_config_filename = argv[9];
- sp_string override_config_filename = argv[10];
- sp_string metrics_sinks_yaml = argv[11];
- sp_int32 metrics_manager_port = atoi(argv[12]);
- sp_int32 ckptmgr_port = atoi(argv[13]);
EventLoopImpl ss;
// Read heron internals config from local file
// Create the heron-internals-config-reader to read the heron internals config
heron::config::HeronInternalsConfigReader::Create(&ss,
- heron_internals_config_filename, override_config_filename);
+ FLAGS_config_file, FLAGS_override_config_file);
- heron::common::Initialize(argv[0], topology_id.c_str());
+ heron::common::Initialize(argv[0], FLAGS_topology_id.c_str());
- LOG(INFO) << "Starting tmaster for topology " << topology_name << " with topology id "
- << topology_id << " zkhostport " << zkhostportlist << " and zkroot " << topdir
- << std::endl;
+ LOG(INFO) << "Starting tmaster for topology " << FLAGS_topology_name << " with topology id "
+ << FLAGS_topology_id << " zkhostport " << FLAGS_zkhostportlist
+ << " and zkroot " << FLAGS_zkroot;
- heron::tmaster::TMaster tmaster(zkhostportlist, topology_name, topology_id, topdir,
- controller_port, master_port, stats_port, metrics_manager_port,
- ckptmgr_port, metrics_sinks_yaml, myhost, &ss);
+ heron::tmaster::TMaster tmaster(FLAGS_zkhostportlist, FLAGS_topology_name, FLAGS_topology_id,
+ FLAGS_zkroot, FLAGS_controller_port, FLAGS_master_port,
+ FLAGS_stats_port, FLAGS_metricsmgr_port,
+ FLAGS_ckptmgr_port, FLAGS_metrics_sinks_yaml, FLAGS_myhost, &ss);
ss.loop();
return 0;
}