Support overriden configuration in ckpmanager (#3215)
* correct the url to download zookeeper 3.4.10
temporarily turn off squashing when building docker image
pass overridden yaml configuration to checkpoint manager
* revert back to squashed docker images
diff --git a/docker/base/Dockerfile.base.debian9 b/docker/base/Dockerfile.base.debian9
index f35b235..d45d592 100644
--- a/docker/base/Dockerfile.base.debian9
+++ b/docker/base/Dockerfile.base.debian9
@@ -29,7 +29,7 @@
# install zookeeper
ARG ZK_DIST=zookeeper-3.4.10
-RUN curl -O "http://www.apache.org/dist/zookeeper/$ZK_DIST/$ZK_DIST.tar.gz" \
+RUN curl -O "https://archive.apache.org/dist/zookeeper/$ZK_DIST/$ZK_DIST.tar.gz" \
&& tar -xzf $ZK_DIST.tar.gz -C /opt \
&& rm -r $ZK_DIST.tar.gz \
&& mv /opt/$ZK_DIST /opt/zookeeper \
diff --git a/docker/dist/Dockerfile.dist.debian9 b/docker/dist/Dockerfile.dist.debian9
index a10aeb8..616ff49 100644
--- a/docker/dist/Dockerfile.dist.debian9
+++ b/docker/dist/Dockerfile.dist.debian9
@@ -54,7 +54,7 @@
# install zookeeper
ARG ZK_DIST=zookeeper-3.4.10
-RUN curl -O "http://www.apache.org/dist/zookeeper/$ZK_DIST/$ZK_DIST.tar.gz" \
+RUN curl -O "https://archive.apache.org/dist/zookeeper/$ZK_DIST/$ZK_DIST.tar.gz" \
&& tar -xzf /heron/$ZK_DIST.tar.gz -C /opt \
&& rm -r /heron/$ZK_DIST.tar.gz \
&& mv /opt/$ZK_DIST /opt/zookeeper \
diff --git a/docker/dist/Dockerfile.dist.ubuntu14.04 b/docker/dist/Dockerfile.dist.ubuntu14.04
index 1a04e1b..689aceb 100644
--- a/docker/dist/Dockerfile.dist.ubuntu14.04
+++ b/docker/dist/Dockerfile.dist.ubuntu14.04
@@ -53,7 +53,7 @@
# install zookeeper
ARG ZK_DIST=zookeeper-3.4.10
-RUN curl -O "http://www.apache.org/dist/zookeeper/$ZK_DIST/$ZK_DIST.tar.gz" \
+RUN curl -O "https://archive.apache.org/dist/zookeeper/$ZK_DIST/$ZK_DIST.tar.gz" \
&& tar -xzf /heron/$ZK_DIST.tar.gz -C /opt \
&& rm -r /heron/$ZK_DIST.tar.gz \
&& mv /opt/$ZK_DIST /opt/zookeeper \
diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index 4798386..e50b59e 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -103,6 +103,13 @@
.required()
.build();
+ Option ckptMgrOverridenConfig = Option.builder("o")
+ .desc("Config name of the checkpoint manager")
+ .longOpt("ckptMgrOverridenConfig")
+ .hasArgs()
+ .argName("ckptMgrOverridenConfig")
+ .build();
+
Option heronInternalConfig = Option.builder("g")
.desc("Heron internal config filename")
.longOpt("heroninternalconfig")
@@ -116,6 +123,7 @@
options.addOption(ckptMgrId);
options.addOption(ckptMgrPort);
options.addOption(ckptMgrConfig);
+ options.addOption(ckptMgrOverridenConfig);
options.addOption(heronInternalConfig);
return options;
@@ -228,11 +236,15 @@
String ckptmgrId = cmd.getOptionValue("ckptmgrid");
int port = Integer.parseInt(cmd.getOptionValue("ckptmgrport"));
String stateConfigFilename = cmd.getOptionValue("ckptmgrconfig");
+ String overriddenConfigFilename = cmd.getOptionValue("ckptMgrOverridenConfig");
String heronInternalConfig = cmd.getOptionValue("heroninternalconfig");
SystemConfig systemConfig = SystemConfig.newBuilder(true).putAll(heronInternalConfig,
true).build();
- CheckpointManagerConfig ckptmgrConfig =
- CheckpointManagerConfig.newBuilder(true).putAll(stateConfigFilename, true).build();
+ CheckpointManagerConfig ckptmgrConfig = CheckpointManagerConfig
+ .newBuilder(true)
+ .putAll(stateConfigFilename, true)
+ .override(overriddenConfigFilename)
+ .build();
// Add the SystemConfig into SingletonRegistry
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
@@ -256,6 +268,7 @@
topologyName, topologyId, ckptmgrId, port));
LOG.info("System Config: " + systemConfig);
+ LOG.info(() -> "Checkpoint Manager Config: " + ckptmgrConfig);
CheckpointManager checkpointManager = new CheckpointManager();
checkpointManager.init(topologyName, topologyId, ckptmgrId,
diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerConfig.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerConfig.java
index 758301c..5111246 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerConfig.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManagerConfig.java
@@ -167,6 +167,21 @@
return this;
}
+ public Builder override(String fileName) {
+ File file = new File(fileName);
+ if (file.exists()) {
+ Map<String, Object> overridden = ConfigReader.loadFile(fileName);
+ //overridden yaml always has flattened key value pair
+ keyValues.putAll(overridden);
+ Object storageConfigMap = keyValues.get(CheckpointManagerConfigKey.STORAGE_CONFIG.value());
+ if (storageConfigMap instanceof Map) {
+ ((Map) storageConfigMap).putAll(overridden);
+ }
+ }
+ return this;
+ }
+
+
private static void convertAndAdd(Map<String, Object> config,
CheckpointManagerConfigKey key,
Object value) {
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 63939ce..e727fdd 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -874,6 +874,7 @@
'-c' + self.ckptmgr_ids[self.shard],
'-p' + self.checkpoint_manager_port,
'-f' + self.stateful_config_file,
+ '-o' + self.override_config_file,
'-g' + self.heron_internals_config_file]
retval = {}
retval[self.ckptmgr_ids[self.shard]] = Command(ckptmgr_cmd, self.shell_env)