Consider override.yaml.
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
index d784048..09533c7 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
@@ -23,6 +23,8 @@
#include "network/network.h"
#include "threads/threads.h"
+#include "yaml-cpp/yaml.h"
+
namespace heron {
namespace config {
@@ -30,9 +32,12 @@
HeronInternalsConfigReader* HeronInternalsConfigReader::heron_internals_config_reader_ = 0;
HeronInternalsConfigReader::HeronInternalsConfigReader(EventLoop* eventLoop,
- const sp_string& _defaults_file)
+ const sp_string& _defaults_file,
+ const sp_string& _override_file)
: YamlFileReader(eventLoop, _defaults_file) {
+ override_file_ = _override_file;
LoadConfig();
+ LoadOverrideConfig();
}
HeronInternalsConfigReader::~HeronInternalsConfigReader() { delete heron_internals_config_reader_; }
@@ -49,18 +54,37 @@
return (heron_internals_config_reader_ != NULL); // Return true/false
}
-void HeronInternalsConfigReader::Create(EventLoop* eventLoop, const sp_string& _defaults_file) {
+void HeronInternalsConfigReader::Create(EventLoop* eventLoop,
+ const sp_string& _defaults_file,
+ const sp_string& _override_file) {
if (heron_internals_config_reader_) {
LOG(FATAL) << "Singleton HeronInternalsConfigReader has already been created";
} else {
- heron_internals_config_reader_ = new HeronInternalsConfigReader(eventLoop, _defaults_file);
+ heron_internals_config_reader_ =
+ new HeronInternalsConfigReader(eventLoop, _defaults_file, _override_file);
}
}
-void HeronInternalsConfigReader::Create(const sp_string& _defaults_file) {
- Create(NULL, _defaults_file);
+void HeronInternalsConfigReader::Create(const sp_string& _defaults_file,
+ const sp_string& _override_file) {
+ Create(NULL, _defaults_file, _override_file);
}
+void HeronInternalsConfigReader::LoadOverrideConfig() {
+ if (override_file_.empty()) {
+ return;
+ }
+ YAML::Node override_config = YAML::LoadFile(override_file_);
+ for (auto n : config_) {
+ if (n.first.IsScalar()) {
+ const std::string& key = n.first.Scalar();
+ auto override_val = YAML::Node(override_config[key]);
+ if (override_val) {
+ config_[n.first] = override_val;
+ }
+ }
+ }
+}
void HeronInternalsConfigReader::OnConfigFileLoad() {
// Nothing really
}
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h
index 37bed01..6728780 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.h
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.h
@@ -42,10 +42,11 @@
static bool Exists();
// Create a singleton reader from a config file,
// which will check and reload the config change
- static void Create(EventLoop* eventLoop, const sp_string& _defaults_file);
+ static void Create(EventLoop* eventLoop,
+ const sp_string& _defaults_file, const sp_string& _override_file);
// Create a singleton reader from a config file,
// which will not check or reload the config change
- static void Create(const sp_string& _defaults_file);
+ static void Create(const sp_string& _defaults_file, const sp_string& _override_file);
virtual void OnConfigFileLoad();
@@ -191,9 +192,12 @@
sp_int32 GetHeronStreammgrNetworkBackpressureLowwatermarkMb();
protected:
- HeronInternalsConfigReader(EventLoop* eventLoop, const sp_string& _defaults_file);
+ HeronInternalsConfigReader(EventLoop* eventLoop,
+ const sp_string& _defaults_file,
+ const sp_string& _override_file);
virtual ~HeronInternalsConfigReader();
-
+ sp_string override_file_;
+ void LoadOverrideConfig();
static HeronInternalsConfigReader* heron_internals_config_reader_;
};
} // namespace config
diff --git a/heron/config/src/yaml/conf/aurora/heron.aurora b/heron/config/src/yaml/conf/aurora/heron.aurora
index b0a90c3..ecd7ce0 100644
--- a/heron/config/src/yaml/conf/aurora/heron.aurora
+++ b/heron/config/src/yaml/conf/aurora/heron.aurora
@@ -37,6 +37,7 @@
'{{thermos.ports[port2]}}' \
'{{thermos.ports[port3]}}' \
'{{SYSTEM_YAML}}' \
+ '{{OVERRIDE_YAML}} ' \
'{{COMPONENT_RAMMAP}}' \
'{{COMPONENT_JVM_OPTS_IN_BASE64}}' \
'{{TOPOLOGY_PACKAGE_TYPE}}' \
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 4595d47..7ca4e2e 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -50,8 +50,8 @@
" <zknode> <zkroot> <tmaster_binary> <stmgr_binary>"
" <metricsmgr_classpath> <instance_jvm_opts_in_base64> <classpath>"
" <master_port> <tmaster_controller_port> <tmaster_stats_port> <heron_internals_config_file>"
- " <component_rammap> <component_jvm_opts_in_base64> <pkg_type> <topology_bin_file>"
- " <heron_java_home> <shell-port> <heron_shell_binary> <metricsmgr_port>"
+ " <override_config_file> <component_rammap> <component_jvm_opts_in_base64> <pkg_type>"
+ " <topology_bin_file> <heron_java_home> <shell-port> <heron_shell_binary> <metricsmgr_port>"
" <cluster> <role> <environ> <instance_classpath> <metrics_sinks_config_file>"
" <scheduler_classpath> <scheduler_port> <python_instance_binary>"
" <metricscachemgr_classpath> <metricscachemgr_masterport> <metricscachemgr_statsport>"
@@ -141,7 +141,7 @@
self.attempts += 1
return self
-# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-instance-attributes,too-many-statements
class HeronExecutor(object):
""" Heron executor is a class that is responsible for running each of the process on a given
container. Based on the container id and the instance distribution, it determines if the container
@@ -175,6 +175,7 @@
self.tmaster_controller_port = parsed_args.tmaster_controller_port
self.tmaster_stats_port = parsed_args.tmaster_stats_port
self.heron_internals_config_file = parsed_args.heron_internals_config_file
+ self.override_config_file = parsed_args.override_config_file
self.component_rammap =\
map(lambda x: {x.split(':')[0]:
int(x.split(':')[1])}, parsed_args.component_rammap.split(','))
@@ -264,6 +265,7 @@
parser.add_argument("tmaster_controller_port")
parser.add_argument("tmaster_stats_port")
parser.add_argument("heron_internals_config_file")
+ parser.add_argument("override_config_file")
parser.add_argument("component_rammap")
parser.add_argument("component_jvm_opts_in_base64")
parser.add_argument("pkg_type")
@@ -374,6 +376,7 @@
self.topology_name,
self.topology_id,
self.heron_internals_config_file,
+ self.override_config_file,
sink_config_file]
return metricsmgr_cmd
@@ -412,6 +415,7 @@
"--topology_name", self.topology_name,
"--topology_id", self.topology_id,
"--system_config_file", self.heron_internals_config_file,
+ "--override_config_file", self.override_config_file,
"--sink_config_file", self.metrics_sinks_config_file,
"--cluster", self.cluster,
"--role", self.role,
@@ -467,6 +471,7 @@
self.zknode,
self.zkroot,
self.heron_internals_config_file,
+ self.override_config_file,
self.metrics_sinks_config_file,
self.metricsmgr_port,
self.ckptmgr_port]
@@ -550,7 +555,8 @@
self.stmgr_ids[self.shard],
self.master_port,
self.metricsmgr_port,
- self.heron_internals_config_file])
+ self.heron_internals_config_file,
+ self.override_config_file])
retval[instance_id] = instance_cmd
return retval
@@ -587,6 +593,7 @@
self.master_port,
self.metricsmgr_port,
self.heron_internals_config_file,
+ self.override_config_file,
self.topology_bin_file]
retval[instance_id] = instance_cmd
@@ -624,6 +631,7 @@
self.metricsmgr_port,
self.shell_port,
self.heron_internals_config_file,
+ self.override_config_file,
self.ckptmgr_port,
self.ckptmgr_ids[self.shard]]
retval[self.stmgr_ids[self.shard]] = stmgr_cmd
@@ -680,7 +688,8 @@
'-c' + self.ckptmgr_ids[self.shard],
'-p' + self.ckptmgr_port,
'-f' + self.stateful_config_file,
- '-g' + self.heron_internals_config_file]
+ '-g' + self.heron_internals_config_file,
+ '-o' + self.override_config_file]
retval = {}
retval[self.ckptmgr_ids[self.shard]] = ckptmgr_cmd
diff --git a/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java b/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
index d1cd912..779771f 100644
--- a/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
+++ b/heron/metricscachemgr/src/java/com/twitter/heron/metricscachemgr/MetricsCacheManager.java
@@ -178,6 +178,13 @@
.argName("system config file")
.build();
+ Option overrideConfig = Option.builder("o")
+ .desc("Override configuration file path")
+ .longOpt("override_config_file")
+ .hasArgs()
+ .argName("override config file")
+ .build();
+
Option sinkConfig = Option.builder("i")
.desc("Sink configuration file path")
.longOpt("sink_config_file")
@@ -220,6 +227,7 @@
options.addOption(masterPort);
options.addOption(statsPort);
options.addOption(systemConfig);
+ options.addOption(overrideConfig);
options.addOption(sinkConfig);
options.addOption(topologyName);
options.addOption(topologyId);
@@ -279,6 +287,7 @@
int masterPort = Integer.valueOf(cmd.getOptionValue("master_port"));
int statsPort = Integer.valueOf(cmd.getOptionValue("stats_port"));
String systemConfigFilename = cmd.getOptionValue("system_config_file");
+ String overrideConfigFilename = cmd.getOptionValue("override_config_file");
String metricsSinksConfigFilename = cmd.getOptionValue("sink_config_file");
String topologyName = cmd.getOptionValue("topology_name");
String topologyId = cmd.getOptionValue("topology_id");
@@ -287,6 +296,7 @@
// read heron internal config file
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFilename, true)
+ .putAll(overrideConfigFilename, true)
.build();
// Log to file and sink(exception)
diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java
index 22f6d18..7cb74c3 100644
--- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java
+++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java
@@ -198,11 +198,11 @@
}
public static void main(String[] args) throws IOException {
- if (args.length != 6) {
+ if (args.length != 7) {
throw new RuntimeException(
"Invalid arguments; Usage: java com.twitter.heron.metricsmgr.MetricsManager "
+ "<id> <port> <topname> <topid> <heron_internals_config_filename> "
- + "<metrics_sinks_config_filename>");
+ + "<override_config_filename> <metrics_sinks_config_filename>");
}
String metricsmgrId = args[0];
@@ -210,10 +210,12 @@
String topologyName = args[2];
String topologyId = args[3];
String systemConfigFilename = args[4];
- String metricsSinksConfigFilename = args[5];
+ String overrideConfigFilename = args[5];
+ String metricsSinksConfigFilename = args[6];
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFilename, true)
+ .putAll(overrideConfigFilename, true)
.build();
// Add the SystemConfig into SingletonRegistry
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
index 8005d7b..2723446 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/utils/SchedulerUtils.java
@@ -215,6 +215,7 @@
commands.add(tmasterControllerPort);
commands.add(tmasterStatsPort);
commands.add(Context.systemConfigFile(config));
+ commands.add(Context.overrideFile(config));
commands.add(Runtime.componentRamMap(runtime));
commands.add(SchedulerUtils.encodeJavaOpts(TopologyUtils.getComponentJvmOptions(topology)));
commands.add(Context.topologyPackageType(config).name().toLowerCase());
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraField.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraField.java
index dc9bcba..5fc4fa9 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraField.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraField.java
@@ -39,6 +39,7 @@
SHELL_BINARY,
STMGR_BINARY,
SYSTEM_YAML,
+ OVERRIDE_YAML,
TMASTER_BINARY,
STATEMGR_CONNECTION_STRING,
STATEMGR_ROOT_PATH,
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
index 67d6c86..f937f1e 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/aurora/AuroraScheduler.java
@@ -210,6 +210,7 @@
TopologyUtils.makeClassPath(topology, Context.topologyBinaryFile(config)));
auroraProperties.put(AuroraField.SYSTEM_YAML, Context.systemConfigFile(config));
+ auroraProperties.put(AuroraField.OVERRIDE_YAML, Context.overrideFile(config));
auroraProperties.put(AuroraField.COMPONENT_RAMMAP, Runtime.componentRamMap(runtime));
auroraProperties.put(AuroraField.COMPONENT_JVM_OPTS_IN_BASE64,
formatJavaOpts(TopologyUtils.getComponentJvmOptions(topology)));
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraSchedulerTest.java
index d4c7e34..57b3429 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/aurora/AuroraSchedulerTest.java
@@ -338,6 +338,9 @@
case SYSTEM_YAML:
expected = expectedConf + "/heron_internals.yaml";
break;
+ case OVERRIDE_YAML:
+ expected = expectedConf + "/override.yaml";
+ break;
case TOPOLOGY_BINARY_FILE:
case TOPOLOGY_CLASSPATH:
expected = "binaryFile.jar";
diff --git a/heron/stmgr/src/cpp/server/stmgr-main.cpp b/heron/stmgr/src/cpp/server/stmgr-main.cpp
index cc10a6c..aff6334 100644
--- a/heron/stmgr/src/cpp/server/stmgr-main.cpp
+++ b/heron/stmgr/src/cpp/server/stmgr-main.cpp
@@ -27,12 +27,12 @@
#include "config/heron-internals-config-reader.h"
int main(int argc, char* argv[]) {
- if (argc != 15) {
+ if (argc != 16) {
std::cout << "Usage: " << argv[0] << " "
<< "<topname> <topid> <topdefnfile> "
<< "<zknode> <zkroot> <stmgrid> "
<< "<instanceids> <myhost> <myport> <metricsmgrport> "
- << "<shellport> <heron_internals_config_filename> "
+ << "<shellport> <heron_internals_config_filename> <override_config_filename>"
<< "<ckptmgr_port> <ckptmgr_id>"
<< std::endl;
std::cout << "If zknode is empty please say LOCALMODE\n";
@@ -55,14 +55,16 @@
sp_int32 metricsmgr_port = atoi(argv[10]);
sp_int32 shell_port = atoi(argv[11]);
sp_string heron_internals_config_filename = argv[12];
- sp_int32 ckptmgr_port = atoi(argv[13]);
- sp_string ckptmgr_id = argv[14];
+ sp_string override_config_filename = argv[13];
+ sp_int32 ckptmgr_port = atoi(argv[14]);
+ sp_string ckptmgr_id = argv[15];
EventLoopImpl ss;
// Read heron internals config from local file
// Create the heron-internals-config-reader to read the heron internals config
- heron::config::HeronInternalsConfigReader::Create(&ss, heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(&ss,
+ heron_internals_config_filename, override_config_filename);
heron::common::Initialize(argv[0], myid.c_str());
diff --git a/heron/stmgr/tests/cpp/server/stateful-restorer_unittest.cpp b/heron/stmgr/tests/cpp/server/stateful-restorer_unittest.cpp
index 336f2a3..e72585a 100644
--- a/heron/stmgr/tests/cpp/server/stateful-restorer_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stateful-restorer_unittest.cpp
@@ -472,7 +472,7 @@
}
// Create the sington for heron_internals_config_reader, if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
- heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
return RUN_ALL_TESTS();
}
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index e0d33cc..c6917f6 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -441,7 +441,7 @@
// Create the sington for heron_internals_config_reader
// if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
- heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
// Create a temporary directory to write out the state
diff --git a/heron/stmgr/tests/cpp/util/tuple-cache_unittest.cpp b/heron/stmgr/tests/cpp/util/tuple-cache_unittest.cpp
index 80af79c..26e2a57 100644
--- a/heron/stmgr/tests/cpp/util/tuple-cache_unittest.cpp
+++ b/heron/stmgr/tests/cpp/util/tuple-cache_unittest.cpp
@@ -307,7 +307,7 @@
// Create the sington for heron_internals_config_reader, if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
- heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
return RUN_ALL_TESTS();
}
diff --git a/heron/stmgr/tests/cpp/util/xor-manager_unittest.cpp b/heron/stmgr/tests/cpp/util/xor-manager_unittest.cpp
index 6a58d55..acf26e8 100644
--- a/heron/stmgr/tests/cpp/util/xor-manager_unittest.cpp
+++ b/heron/stmgr/tests/cpp/util/xor-manager_unittest.cpp
@@ -145,7 +145,7 @@
// Create the sington for heron_internals_config_reader, if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
- heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
return RUN_ALL_TESTS();
}
diff --git a/heron/tmaster/src/cpp/server/tmaster-main.cpp b/heron/tmaster/src/cpp/server/tmaster-main.cpp
index 1b8bb45..dc07151 100644
--- a/heron/tmaster/src/cpp/server/tmaster-main.cpp
+++ b/heron/tmaster/src/cpp/server/tmaster-main.cpp
@@ -26,11 +26,11 @@
#include "config/heron-internals-config-reader.h"
int main(int argc, char* argv[]) {
- if (argc != 13) {
+ if (argc != 14) {
std::cout << "Usage: " << argv[0] << " "
<< "<master-host> <master-port> <controller-port> <stats-port> "
<< "<topology_name> <topology_id> <zk_hostportlist> "
- << "<topdir> <heron_internals_config_filename> "
+ << "<topdir> <heron_internals_config_filename> <override_config_filename>"
<< "<metrics_sinks_filename> <metrics-manager-port> <ckptmgr-port>" << std::endl;
std::cout << "If zk_hostportlist is empty please say LOCALMODE\n";
::exit(1);
@@ -48,15 +48,17 @@
}
sp_string topdir = argv[8];
sp_string heron_internals_config_filename = argv[9];
- sp_string metrics_sinks_yaml = argv[10];
- sp_int32 metrics_manager_port = atoi(argv[11]);
- sp_int32 ckptmgr_port = atoi(argv[12]);
+ sp_string override_config_filename = argv[10];
+ sp_string metrics_sinks_yaml = argv[11];
+ sp_int32 metrics_manager_port = atoi(argv[12]);
+ sp_int32 ckptmgr_port = atoi(argv[13]);
EventLoopImpl ss;
// Read heron internals config from local file
// Create the heron-internals-config-reader to read the heron internals config
- heron::config::HeronInternalsConfigReader::Create(&ss, heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(&ss,
+ heron_internals_config_filename, override_config_filename);
heron::common::Initialize(argv[0], topology_id.c_str());
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index aee840b..7f45b63 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -47,6 +47,8 @@
const sp_string LOCALHOST = "127.0.0.1";
sp_string heron_internals_config_filename =
"../../../../../../../../heron/config/heron_internals.yaml";
+sp_string override_config_filename =
+ "../../../../../../../../heron/config/heron_internals.yaml";
sp_string metrics_sinks_config_filename = "../../../../../../../../heron/config/metrics_sinks.yaml";
// Generate a dummy topology
@@ -335,7 +337,7 @@
// Create the sington for heron_internals_config_reader
// if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
- heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
}
};
@@ -629,7 +631,7 @@
// Create the sington for heron_internals_config_reader, if it does not exist
if (!heron::config::HeronInternalsConfigReader::Exists()) {
- heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename);
+ heron::config::HeronInternalsConfigReader::Create(heron_internals_config_filename, "");
}
return RUN_ALL_TESTS();
}