STORM-3506 prevent topo conf from overriding some system properties (#3125)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 69b2bb1..d2abeb0 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1086,6 +1086,13 @@
}
@SuppressWarnings("unchecked")
+ /**
+ * Create a normalized topology conf.
+ *
+ * @param conf the nimbus conf
+ * @param topoConf initial topology conf
+ * @param topology the Storm topology
+ */
private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) {
//ensure that serializations are same for all tasks no matter what's on
// the supervisors. this also allows you to declare the serializations as a sequence
@@ -1114,6 +1121,20 @@
ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+ // Don't allow topoConf to override various cluster-specific properties.
+ // Specifically adding the cluster settings to the topoConf here will make sure these settings
+ // also override the subsequently generated conf picked up locally on the classpath.
+ //
+ // We will be dealing with 3 confs:
+ // 1) the submitted topoConf created here
+ // 2) the combined classpath conf with the topoConf added on top
+ // 3) the nimbus conf with conf 2 above added on top.
+ //
+ // By first forcing the topology conf to contain the nimbus settings, we guarantee all three confs
+ // will have the correct settings that cannot be overriden by the submitter.
+ ret.put(Config.STORM_CGROUP_HIERARCHY_DIR, conf.get(Config.STORM_CGROUP_HIERARCHY_DIR));
+ ret.put(Config.WORKER_METRICS, conf.get(Config.WORKER_METRICS));
+
if (mergedConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
int workerTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
int workerMaxTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.WORKER_MAX_TIMEOUT_SECS));
@@ -3100,6 +3121,7 @@
if (!(Boolean) conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
}
+
String topoVersionString = topology.get_storm_version();
if (topoVersionString == null) {
topoVersionString = (String) conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());