Merge pull request #3156 from efgpinto/STORM-3211

STORM-3211: Fix NPE in WindowedBoltExecutor on getComponentConfiguration
diff --git a/RELEASING.md b/RELEASING.md
index 4e6d8b2..6affa3a 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -1,10 +1,16 @@
-# Committer documentation
+# Release
 
-This document summarizes information relevant to Storm committers.  It includes information about
-the Storm release process.
+This document includes information about the Storm release process.
 
 ---
 
+# Release Policy
+
+Apache Storm follows the basic idea of [Semantic Versioning](https://semver.org/). Given a version number MAJOR.MINOR.PATCH, increment the:
+ 1. MAJOR version when you make incompatible API changes,
+ 2. MINOR version when you add functionality in a backwards compatible manner, and
+ 3. PATCH version when you make backwards compatible bug fixes.
+ 
 # Release process
 
 ## Preparation
@@ -15,19 +21,39 @@
 
 Ensure you have a signed GPG key, and that the GPG key is listed in the Storm KEYS file at https://dist.apache.org/repos/dist/release/storm/KEYS. The key should be hooked into the Apache web of trust. You should read the [Apache release signing page](http://www.apache.org/dev/release-signing.html), the [release distribution page](http://www.apache.org/dev/release-distribution.html#sigs-and-sums), as well as the [release publishing](http://www.apache.org/dev/release-publishing) and [release policy](http://www.apache.org/legal/release-policy.html) pages.
 
+If you are setting up a new MINOR version release, create a new branch based on `master` branch, e.g. `2.2.x-branch`. Then on master branch, set the version to a higher MINOR version (with SNAPSHOT), e.g. `mvn versions:set -DnewVersion=2.3.0-SNAPSHOT -P dist,rat,externals,examples`.
+In this way, you create a new release line and then you can create PATCH version releases from it, e.g. `2.2.0`.
+
 ## Setting up a vote
 
-1. Run `mvn release:prepare` followed `mvn release:perform` on the branch to be released. This will create all the artifacts that will eventually be available in maven central. This step may seem simple, but a lot can go wrong (mainly flaky tests).
+0. Checkout to the branch to be released.
+
+1. Run `mvn release:prepare -P dist,rat,externals,examples` followed `mvn release:perform -P dist,rat,externals,examples`. This will create all the artifacts that will eventually be available in maven central. This step may seem simple, but a lot can go wrong (mainly flaky tests). 
+Note that this will create and push two commits with the commit message starting with "[maven-release-plugin]" and it will also create and publish a git tag, e.g. `v2.2.0`.
 
 2. Once you get a successful maven release, a “staging repository” will be created at http://repository.apache.org in the “open” state, meaning it is still writable. You will need to close it, making it read-only. You can find more information on this step [here](www.apache.org/dev/publishing-maven-artifacts.html).
 
-3. Run `mvn package` for `storm-dist/binary` and `storm-dist/source` to create the actual distributions.
+3. Checkout to the git tag that was published by Step 1 above, e.g. `git checkout tags/v2.2.0 -b v2.2.0`. Run `mvn package` for `storm-dist/binary` and `storm-dist/source` to create the actual distributions.
 
-4. Sign and generate checksums for the *.tar.gz and *.zip distribution files. 
+4. Generate checksums for the *.tar.gz and *.zip distribution files, e.g.
+```bash
+cd storm-dist/source/target
+gpg --print-md SHA512 apache-storm-2.2.0-src.zip > apache-storm-2.2.0-src.zip.sha512
+gpg --print-md SHA512 apache-storm-2.2.0-src.tar.gz > apache-storm-2.2.0-src.tar.gz.sha512
+
+cd storm-dist/binary/final-package/target
+gpg --print-md SHA512 apache-storm-2.2.0.zip > apache-storm-2.2.0.zip.sha512
+gpg --print-md SHA512 apache-storm-2.2.0.tar.gz > apache-storm-2.2.0.tar.gz.sha512
+```
 
 5. Create a directory in the dist svn repo for the release candidate: https://dist.apache.org/repos/dist/dev/storm/apache-storm-x.x.x-rcx
 
-6. Run `dev-tools/release_notes.py` for the release version, piping the output to a RELEASE_NOTES.html file. Move that file to the svn release directory, sign it, and generate checksums.
+6. Run `dev-tools/release_notes.py` for the release version, piping the output to a RELEASE_NOTES.html file. Move that file to the svn release directory, sign it, and generate checksums, e.g.
+```bash
+python dev-tools/release_notes.py 2.2.0 > RELEASE_NOTES.html
+gpg --armor --output RELEASE_NOTES.html.asc --detach-sig RELEASE_NOTES.html
+gpg --print-md SHA512 RELEASE_NOTES.html > RELEASE_NOTES.html.sha512
+```
 
 7. Move the release files from Step 4 and 6 to the svn directory from Step 5. Add and commit the files. This makes them available in the Apache staging repo.
 
@@ -56,3 +82,26 @@
 1. Go to http://repository.apache.org and drop the staging repository.
 
 2. Delete the staged distribution files from https://dist.apache.org/repos/dist/dev/storm/
+
+3. Delete the git tag.
+
+# How to vote on a release candidate
+
+We encourage everyone to review and vote on a release candidate to make an Apache Storm release more reliable and trustworthy.
+
+Below is a checklist that one could do to review a release candidate. 
+Please note this list is not exhaustive and only includes some of the common steps. Feel free to add your own tests.
+
+1. Verify files such as *.asc, *.sha512; some scripts are available under `dev-tools/rc` to help with it;
+2. Build Apache Storm source code and run unit tests, create an Apache Storm distribution;
+3. Set up a standalone cluster using apache-storm-xxx.zip, apache-storm-xxx.tar.gz, the Apache Storm distribution created from step 2, separately;
+4. Launch WordCountTopology and ThroughputVsLatency topology and check logs, UI metrics, etc;
+5. Test basic UI functionalities such as jstack, heap dump, deactivate, activate, rebalance, change log level, log search, kill topology;
+6. Test basic CLI such as kill, list, deactivate, deactivate, rebalance, etc.
+
+It's also preferable to set up a standalone secure Apache Storm cluster and test basic funcionalities on it.
+
+Don't feel the pressure to do everything listed above. After you finish your review, reply to the corresponding email thread with your vote, summarize the work you have performed and elaborate the issues
+you have found if any. Also please feel free to update the checklist if you think anything important is missing there. 
+
+Your contribution is very much appreciated.  
\ No newline at end of file
diff --git a/dev-tools/rc/verify-release-file.sh b/dev-tools/rc/verify-release-file.sh
index 2e33965..3e6a869 100755
--- a/dev-tools/rc/verify-release-file.sh
+++ b/dev-tools/rc/verify-release-file.sh
@@ -34,21 +34,6 @@
   echo 'Signature seems not correct'
 fi
 
-# checking MD5
-GPG_MD5_FILE="/tmp/${TARGET_FILE}_GPG.md5"
-gpg --print-md MD5 ${TARGET_FILE} > ${GPG_MD5_FILE}
-MD5_TARGET_FILE="${TARGET_FILE}.md5"
-
-echo ">> checking MD5 file... (${MD5_TARGET_FILE})"
-diff ${GPG_MD5_FILE} ${MD5_TARGET_FILE}
-
-if [ $? -eq 0 ];
-then
-  echo 'MD5 file is correct'
-else
-  echo 'MD5 file is not correct'
-fi
-
 # checking SHA
 GPG_SHA_FILE="/tmp/${TARGET_FILE}_GPG.sha512"
 gpg --print-md SHA512 ${TARGET_FILE} > ${GPG_SHA_FILE}
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 8299c14..50570e1 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -18,12 +18,17 @@
 
 package org.apache.storm.flux.parser;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.storm.flux.model.BoltDef;
 import org.apache.storm.flux.model.IncludeDef;
@@ -40,17 +45,20 @@
  */
 public class FluxParser {
     private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+    private static final Pattern propertyPattern =
+            Pattern.compile(".*\\$\\{(?<var>ENV-(?<envVar>.+)|(?<list>.+)\\[(?<listIndex>\\d+)]|.+)}.*");
 
     private FluxParser() {
     }
 
     /**
      * Parse a flux topology definition.
-     * @param inputFile source YAML file
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param inputFile       source YAML file
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topologuy definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -65,11 +73,12 @@
 
     /**
      * Parse a flux topology definition from a classpath resource..
-     * @param resource YAML resource
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param resource        YAML resource
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topologuy definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -84,11 +93,12 @@
 
     /**
      * Parse a flux topology definition.
-     * @param inputStream InputStream representation of YAML file
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param inputStream     InputStream representation of YAML file
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topology definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -116,10 +126,11 @@
 
     /**
      * Parse filter properties file.
+     *
      * @param propertiesFile properties file for variable substitution
-     * @param resource whether or not to load properties file from classpath
+     * @param resource       whether or not to load properties file from classpath
      * @return resulting filter properties
-     * @throws IOException  if there is a problem reading file
+     * @throws IOException if there is a problem reading file
      */
     public static Properties parseProperties(String propertiesFile, boolean resource) throws IOException {
         Properties properties = null;
@@ -140,36 +151,43 @@
     }
 
     private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties, boolean envSubstitution) throws IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         LOG.info("loading YAML from input stream...");
-        int b = -1;
-        while ((b = in.read()) != -1) {
-            bos.write(b);
-        }
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+            String conf = reader.lines().map(line -> {
+                Matcher m = propertyPattern.matcher(line);
+                return m.find()
+                        ? getPropertyReplacement(properties, m, envSubstitution)
+                        .map(propValue -> line.replace("${" + m.group("var") + "}", propValue))
+                        .orElseGet(() -> {
+                            LOG.warn("Could not find replacement for property: " + m.group("var"));
+                            return line;
+                        })
+                        : line;
+            }).collect(Collectors.joining(System.lineSeparator()));
 
-        // TODO substitution implementation is not exactly efficient or kind to memory...
-        String str = bos.toString();
-        // properties file substitution
-        if (properties != null) {
-            LOG.info("Performing property substitution.");
-            for (Object key : properties.keySet()) {
-                str = str.replace("${" + key + "}", properties.getProperty((String)key));
-            }
-        } else {
-            LOG.info("Not performing property substitution.");
+            return (TopologyDef) yaml.load(conf);
         }
+    }
 
-        // environment variable substitution
-        if (envSubstitution) {
-            LOG.info("Performing environment variable substitution...");
-            Map<String, String> envs = System.getenv();
-            for (String key : envs.keySet()) {
-                str = str.replace("${ENV-" + key + "}", envs.get(key));
-            }
+    private static Optional<String> getPropertyReplacement(Properties properties, Matcher match, boolean envSubstitution) {
+        if (match.group("listIndex") != null) {
+            String prop = properties.getProperty(match.group("list"));
+            return Optional.of(parseListAndExtractElem(prop, match.group("listIndex")));
+        } else if (envSubstitution && match.group("envVar") != null) {
+            String envVar = System.getenv().get(match.group("envVar"));
+            return Optional.ofNullable(envVar);
         } else {
-            LOG.info("Not performing environment variable substitution.");
+            return Optional.ofNullable(properties.getProperty(match.group("var")));
         }
-        return (TopologyDef) yaml.load(str);
+    }
+
+    private static String parseListAndExtractElem(String strList, String index) {
+        String[] listProp = strList.substring(1, strList.length() - 1).split(",");
+        String listElem = listProp[Integer.parseInt(index)];
+
+        // remove whitespaces and double quotes from beginning and end of a given string
+        String trimmed = listElem.trim();
+        return trimmed.substring(1, trimmed.length() - 1);
     }
 
     private static void dumpYaml(TopologyDef topology, Yaml yaml) {
@@ -191,14 +209,15 @@
 
     /**
      * Process includes contained within a yaml file.
+     *
      * @param yaml        the yaml parser for parsing the include file(s)
      * @param topologyDef the topology definition containing (possibly zero) includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties  properties file for variable substitution
+     * @param envSub      whether or not to perform environment variable substitution
      * @return The TopologyDef with includes resolved.
      */
     private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub)
-        throws IOException {
+            throws IOException {
         //TODO support multiple levels of includes
         if (topologyDef.getIncludes() != null) {
             for (IncludeDef include : topologyDef.getIncludes()) {
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 90613c9..275a720 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -275,6 +275,11 @@
                Collections.singletonList("A string list"),
                is(context.getTopologyDef().getConfig().get("list.property.target")));
 
+        //Test substitution where the target type is a List element
+        assertThat("List element property is not replaced by the expected value",
+                "A string list",
+                is(context.getTopologyDef().getConfig().get("list.element.property.target")));
+
     }
     
     @Test
diff --git a/flux/flux-core/src/test/resources/configs/substitution-test.yaml b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
index 9707936..67ac92a 100644
--- a/flux/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -45,6 +45,8 @@
   test.env.value: "${ENV-PATH}"
   # test variable substitution for list type
   list.property.target: ${a.list.property}
+  # test variable substitution for list element
+  list.element.property.target: ${a.list.property[0]}
 
 # spout definitions
 spouts:
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 578be2b..fdd130b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,7 +21,6 @@
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -95,7 +94,8 @@
     private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
     private final Path localBaseDir;
     private final int blobDownloadRetries;
-    private final ScheduledExecutorService execService;
+    private final ScheduledExecutorService downloadExecService;
+    private final ScheduledExecutorService taskExecService;
     private final long cacheCleanupPeriod;
     private final StormMetricsRegistry metricsRegistry;
     // cleanup
@@ -120,13 +120,14 @@
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
 
-        // if we needed we could make config for update thread pool size
-        int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
         blobDownloadRetries = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
 
-        execService = Executors.newScheduledThreadPool(threadPoolSize,
-                                                       new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
+        int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+        downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
+        taskExecService = Executors.newScheduledThreadPool(3,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
         reconstructLocalizedResources();
 
         symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
@@ -213,7 +214,7 @@
             blobPending.compute(topologyId, (tid, old) -> {
                 CompletableFuture<Void> ret = old;
                 if (ret == null) {
-                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
                 } else {
                     try {
                         addReferencesToBlobs(pna, cb);
@@ -291,7 +292,7 @@
                     }
                 }
                 LOG.debug("FINISHED download of {}", blob);
-            }, execService);
+            }, downloadExecService);
             i++;
         }
         return CompletableFuture.allOf(all);
@@ -337,14 +338,15 @@
      * Start any background threads needed.  This includes updating blobs and cleaning up unused blobs over the configured size limit.
      */
     public void start() {
-        execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
         LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
-        execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+        taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void close() throws InterruptedException {
-        execService.shutdown();
+        downloadExecService.shutdown();
+        taskExecService.shutdown();
     }
 
     private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
@@ -450,15 +452,7 @@
             topoConfBlob.removeReference(pna);
         }
 
-        List<LocalResource> localResources;
-        try {
-            localResources = getLocalResources(pna);
-        } catch (FileNotFoundException e) {
-            LOG.warn("Local resources for {} no longer available", pna, e);
-            return;
-        }
-
-        for (LocalResource lr : localResources) {
+        for (LocalResource lr : getLocalResources(pna)) {
             try {
                 removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
             } catch (Exception e) {