Merge pull request #3155 from efgpinto/STORM-3066
STORM-3066: Implement support for using list elements in properties in FluxParser
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 8299c14..50570e1 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -18,12 +18,17 @@
package org.apache.storm.flux.parser;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.storm.flux.model.BoltDef;
import org.apache.storm.flux.model.IncludeDef;
@@ -40,17 +45,20 @@
*/
public class FluxParser {
private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+ private static final Pattern propertyPattern =
+ Pattern.compile(".*\\$\\{(?<var>ENV-(?<envVar>.+)|(?<list>.+)\\[(?<listIndex>\\d+)]|.+)}.*");
private FluxParser() {
}
/**
* Parse a flux topology definition.
- * @param inputFile source YAML file
- * @param dumpYaml if true, dump the parsed YAML to stdout
+ *
+ * @param inputFile source YAML file
+ * @param dumpYaml if true, dump the parsed YAML to stdout
* @param processIncludes whether or not to process includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return resulting topologuy definition
* @throws IOException if there is a problem reading file(s)
*/
@@ -65,11 +73,12 @@
/**
* Parse a flux topology definition from a classpath resource..
- * @param resource YAML resource
- * @param dumpYaml if true, dump the parsed YAML to stdout
+ *
+ * @param resource YAML resource
+ * @param dumpYaml if true, dump the parsed YAML to stdout
* @param processIncludes whether or not to process includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return resulting topologuy definition
* @throws IOException if there is a problem reading file(s)
*/
@@ -84,11 +93,12 @@
/**
* Parse a flux topology definition.
- * @param inputStream InputStream representation of YAML file
- * @param dumpYaml if true, dump the parsed YAML to stdout
+ *
+ * @param inputStream InputStream representation of YAML file
+ * @param dumpYaml if true, dump the parsed YAML to stdout
* @param processIncludes whether or not to process includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return resulting topology definition
* @throws IOException if there is a problem reading file(s)
*/
@@ -116,10 +126,11 @@
/**
* Parse filter properties file.
+ *
* @param propertiesFile properties file for variable substitution
- * @param resource whether or not to load properties file from classpath
+ * @param resource whether or not to load properties file from classpath
* @return resulting filter properties
- * @throws IOException if there is a problem reading file
+ * @throws IOException if there is a problem reading file
*/
public static Properties parseProperties(String propertiesFile, boolean resource) throws IOException {
Properties properties = null;
@@ -140,36 +151,43 @@
}
private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties, boolean envSubstitution) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
LOG.info("loading YAML from input stream...");
- int b = -1;
- while ((b = in.read()) != -1) {
- bos.write(b);
- }
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+ String conf = reader.lines().map(line -> {
+ Matcher m = propertyPattern.matcher(line);
+ return m.find()
+ ? getPropertyReplacement(properties, m, envSubstitution)
+ .map(propValue -> line.replace("${" + m.group("var") + "}", propValue))
+ .orElseGet(() -> {
+ LOG.warn("Could not find replacement for property: " + m.group("var"));
+ return line;
+ })
+ : line;
+ }).collect(Collectors.joining(System.lineSeparator()));
- // TODO substitution implementation is not exactly efficient or kind to memory...
- String str = bos.toString();
- // properties file substitution
- if (properties != null) {
- LOG.info("Performing property substitution.");
- for (Object key : properties.keySet()) {
- str = str.replace("${" + key + "}", properties.getProperty((String)key));
- }
- } else {
- LOG.info("Not performing property substitution.");
+ return (TopologyDef) yaml.load(conf);
}
+ }
- // environment variable substitution
- if (envSubstitution) {
- LOG.info("Performing environment variable substitution...");
- Map<String, String> envs = System.getenv();
- for (String key : envs.keySet()) {
- str = str.replace("${ENV-" + key + "}", envs.get(key));
- }
+ private static Optional<String> getPropertyReplacement(Properties properties, Matcher match, boolean envSubstitution) {
+ if (match.group("listIndex") != null) {
+ String prop = properties.getProperty(match.group("list"));
+ return Optional.of(parseListAndExtractElem(prop, match.group("listIndex")));
+ } else if (envSubstitution && match.group("envVar") != null) {
+ String envVar = System.getenv().get(match.group("envVar"));
+ return Optional.ofNullable(envVar);
} else {
- LOG.info("Not performing environment variable substitution.");
+ return Optional.ofNullable(properties.getProperty(match.group("var")));
}
- return (TopologyDef) yaml.load(str);
+ }
+
+ private static String parseListAndExtractElem(String strList, String index) {
+ String[] listProp = strList.substring(1, strList.length() - 1).split(",");
+ String listElem = listProp[Integer.parseInt(index)];
+
+ // remove whitespaces and double quotes from beginning and end of a given string
+ String trimmed = listElem.trim();
+ return trimmed.substring(1, trimmed.length() - 1);
}
private static void dumpYaml(TopologyDef topology, Yaml yaml) {
@@ -191,14 +209,15 @@
/**
* Process includes contained within a yaml file.
+ *
* @param yaml the yaml parser for parsing the include file(s)
* @param topologyDef the topology definition containing (possibly zero) includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return The TopologyDef with includes resolved.
*/
private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub)
- throws IOException {
+ throws IOException {
//TODO support multiple levels of includes
if (topologyDef.getIncludes() != null) {
for (IncludeDef include : topologyDef.getIncludes()) {
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 90613c9..275a720 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -275,6 +275,11 @@
Collections.singletonList("A string list"),
is(context.getTopologyDef().getConfig().get("list.property.target")));
+ //Test substitution where the target type is a List element
+ assertThat("List element property is not replaced by the expected value",
+ "A string list",
+ is(context.getTopologyDef().getConfig().get("list.element.property.target")));
+
}
@Test
diff --git a/flux/flux-core/src/test/resources/configs/substitution-test.yaml b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
index 9707936..67ac92a 100644
--- a/flux/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -45,6 +45,8 @@
test.env.value: "${ENV-PATH}"
# test variable substitution for list type
list.property.target: ${a.list.property}
+ # test variable substitution for list element
+ list.element.property.target: ${a.list.property[0]}
# spout definitions
spouts: