STORM-3066: Implement support for using list elements in properties
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 8299c14..50570e1 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -18,12 +18,17 @@
 
 package org.apache.storm.flux.parser;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.storm.flux.model.BoltDef;
 import org.apache.storm.flux.model.IncludeDef;
@@ -40,17 +45,20 @@
  */
 public class FluxParser {
     private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+    private static final Pattern propertyPattern =
+            Pattern.compile(".*\\$\\{(?<var>ENV-(?<envVar>.+)|(?<list>.+)\\[(?<listIndex>\\d+)]|.+)}.*");
 
     private FluxParser() {
     }
 
     /**
      * Parse a flux topology definition.
-     * @param inputFile source YAML file
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param inputFile       source YAML file
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topologuy definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -65,11 +73,12 @@
 
     /**
      * Parse a flux topology definition from a classpath resource..
-     * @param resource YAML resource
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param resource        YAML resource
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topologuy definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -84,11 +93,12 @@
 
     /**
      * Parse a flux topology definition.
-     * @param inputStream InputStream representation of YAML file
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param inputStream     InputStream representation of YAML file
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topology definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -116,10 +126,11 @@
 
     /**
      * Parse filter properties file.
+     *
      * @param propertiesFile properties file for variable substitution
-     * @param resource whether or not to load properties file from classpath
+     * @param resource       whether or not to load properties file from classpath
      * @return resulting filter properties
-     * @throws IOException  if there is a problem reading file
+     * @throws IOException if there is a problem reading file
      */
     public static Properties parseProperties(String propertiesFile, boolean resource) throws IOException {
         Properties properties = null;
@@ -140,36 +151,43 @@
     }
 
     private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties, boolean envSubstitution) throws IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         LOG.info("loading YAML from input stream...");
-        int b = -1;
-        while ((b = in.read()) != -1) {
-            bos.write(b);
-        }
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+            String conf = reader.lines().map(line -> {
+                Matcher m = propertyPattern.matcher(line);
+                return m.find()
+                        ? getPropertyReplacement(properties, m, envSubstitution)
+                        .map(propValue -> line.replace("${" + m.group("var") + "}", propValue))
+                        .orElseGet(() -> {
+                            LOG.warn("Could not find replacement for property: " + m.group("var"));
+                            return line;
+                        })
+                        : line;
+            }).collect(Collectors.joining(System.lineSeparator()));
 
-        // TODO substitution implementation is not exactly efficient or kind to memory...
-        String str = bos.toString();
-        // properties file substitution
-        if (properties != null) {
-            LOG.info("Performing property substitution.");
-            for (Object key : properties.keySet()) {
-                str = str.replace("${" + key + "}", properties.getProperty((String)key));
-            }
-        } else {
-            LOG.info("Not performing property substitution.");
+            return (TopologyDef) yaml.load(conf);
         }
+    }
 
-        // environment variable substitution
-        if (envSubstitution) {
-            LOG.info("Performing environment variable substitution...");
-            Map<String, String> envs = System.getenv();
-            for (String key : envs.keySet()) {
-                str = str.replace("${ENV-" + key + "}", envs.get(key));
-            }
+    private static Optional<String> getPropertyReplacement(Properties properties, Matcher match, boolean envSubstitution) {
+        if (match.group("listIndex") != null) {
+            String prop = properties.getProperty(match.group("list"));
+            return Optional.of(parseListAndExtractElem(prop, match.group("listIndex")));
+        } else if (envSubstitution && match.group("envVar") != null) {
+            String envVar = System.getenv().get(match.group("envVar"));
+            return Optional.ofNullable(envVar);
         } else {
-            LOG.info("Not performing environment variable substitution.");
+            return Optional.ofNullable(properties.getProperty(match.group("var")));
         }
-        return (TopologyDef) yaml.load(str);
+    }
+
+    private static String parseListAndExtractElem(String strList, String index) {
+        String[] listProp = strList.substring(1, strList.length() - 1).split(",");
+        String listElem = listProp[Integer.parseInt(index)];
+
+        // remove whitespaces and double quotes from beginning and end of a given string
+        String trimmed = listElem.trim();
+        return trimmed.substring(1, trimmed.length() - 1);
     }
 
     private static void dumpYaml(TopologyDef topology, Yaml yaml) {
@@ -191,14 +209,15 @@
 
     /**
      * Process includes contained within a yaml file.
+     *
      * @param yaml        the yaml parser for parsing the include file(s)
      * @param topologyDef the topology definition containing (possibly zero) includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties  properties file for variable substitution
+     * @param envSub      whether or not to perform environment variable substitution
      * @return The TopologyDef with includes resolved.
      */
     private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub)
-        throws IOException {
+            throws IOException {
         //TODO support multiple levels of includes
         if (topologyDef.getIncludes() != null) {
             for (IncludeDef include : topologyDef.getIncludes()) {
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 90613c9..275a720 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -275,6 +275,11 @@
                Collections.singletonList("A string list"),
                is(context.getTopologyDef().getConfig().get("list.property.target")));
 
+        //Test substitution where the target type is a List element
+        assertThat("List element property is not replaced by the expected value",
+                "A string list",
+                is(context.getTopologyDef().getConfig().get("list.element.property.target")));
+
     }
     
     @Test
diff --git a/flux/flux-core/src/test/resources/configs/substitution-test.yaml b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
index 9707936..67ac92a 100644
--- a/flux/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -45,6 +45,8 @@
   test.env.value: "${ENV-PATH}"
   # test variable substitution for list type
   list.property.target: ${a.list.property}
+  # test variable substitution for list element
+  list.element.property.target: ${a.list.property[0]}
 
 # spout definitions
 spouts: