Merge pull request #3155 from efgpinto/STORM-3066

STORM-3066: Implement support for using list elements in properties in FluxParser
diff --git a/RELEASING.md b/RELEASING.md
index 4e6d8b2..6affa3a 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -1,10 +1,16 @@
-# Committer documentation
+# Release
 
-This document summarizes information relevant to Storm committers.  It includes information about
-the Storm release process.
+This document includes information about the Storm release process.
 
 ---
 
+# Release Policy
+
+Apache Storm follows the basic idea of [Semantic Versioning](https://semver.org/). Given a version number MAJOR.MINOR.PATCH, increment the:
+ 1. MAJOR version when you make incompatible API changes,
+ 2. MINOR version when you add functionality in a backwards compatible manner, and
+ 3. PATCH version when you make backwards compatible bug fixes.
+ 
 # Release process
 
 ## Preparation
@@ -15,19 +21,39 @@
 
 Ensure you have a signed GPG key, and that the GPG key is listed in the Storm KEYS file at https://dist.apache.org/repos/dist/release/storm/KEYS. The key should be hooked into the Apache web of trust. You should read the [Apache release signing page](http://www.apache.org/dev/release-signing.html), the [release distribution page](http://www.apache.org/dev/release-distribution.html#sigs-and-sums), as well as the [release publishing](http://www.apache.org/dev/release-publishing) and [release policy](http://www.apache.org/legal/release-policy.html) pages.
 
+If you are setting up a new MINOR version release, create a new branch based on `master` branch, e.g. `2.2.x-branch`. Then on master branch, set the version to a higher MINOR version (with SNAPSHOT), e.g. `mvn versions:set -DnewVersion=2.3.0-SNAPSHOT -P dist,rat,externals,examples`.
+In this way, you create a new release line and then you can create PATCH version releases from it, e.g. `2.2.0`.
+
 ## Setting up a vote
 
-1. Run `mvn release:prepare` followed `mvn release:perform` on the branch to be released. This will create all the artifacts that will eventually be available in maven central. This step may seem simple, but a lot can go wrong (mainly flaky tests).
+0. Checkout to the branch to be released.
+
+1. Run `mvn release:prepare -P dist,rat,externals,examples` followed `mvn release:perform -P dist,rat,externals,examples`. This will create all the artifacts that will eventually be available in maven central. This step may seem simple, but a lot can go wrong (mainly flaky tests). 
+Note that this will create and push two commits with the commit message starting with "[maven-release-plugin]" and it will also create and publish a git tag, e.g. `v2.2.0`.
 
 2. Once you get a successful maven release, a “staging repository” will be created at http://repository.apache.org in the “open” state, meaning it is still writable. You will need to close it, making it read-only. You can find more information on this step [here](www.apache.org/dev/publishing-maven-artifacts.html).
 
-3. Run `mvn package` for `storm-dist/binary` and `storm-dist/source` to create the actual distributions.
+3. Checkout to the git tag that was published by Step 1 above, e.g. `git checkout tags/v2.2.0 -b v2.2.0`. Run `mvn package` for `storm-dist/binary` and `storm-dist/source` to create the actual distributions.
 
-4. Sign and generate checksums for the *.tar.gz and *.zip distribution files. 
+4. Generate checksums for the *.tar.gz and *.zip distribution files, e.g.
+```bash
+cd storm-dist/source/target
+gpg --print-md SHA512 apache-storm-2.2.0-src.zip > apache-storm-2.2.0-src.zip.sha512
+gpg --print-md SHA512 apache-storm-2.2.0-src.tar.gz > apache-storm-2.2.0-src.tar.gz.sha512
+
+cd storm-dist/binary/final-package/target
+gpg --print-md SHA512 apache-storm-2.2.0.zip > apache-storm-2.2.0.zip.sha512
+gpg --print-md SHA512 apache-storm-2.2.0.tar.gz > apache-storm-2.2.0.tar.gz.sha512
+```
 
 5. Create a directory in the dist svn repo for the release candidate: https://dist.apache.org/repos/dist/dev/storm/apache-storm-x.x.x-rcx
 
-6. Run `dev-tools/release_notes.py` for the release version, piping the output to a RELEASE_NOTES.html file. Move that file to the svn release directory, sign it, and generate checksums.
+6. Run `dev-tools/release_notes.py` for the release version, piping the output to a RELEASE_NOTES.html file. Move that file to the svn release directory, sign it, and generate checksums, e.g.
+```bash
+python dev-tools/release_notes.py 2.2.0 > RELEASE_NOTES.html
+gpg --armor --output RELEASE_NOTES.html.asc --detach-sig RELEASE_NOTES.html
+gpg --print-md SHA512 RELEASE_NOTES.html > RELEASE_NOTES.html.sha512
+```
 
 7. Move the release files from Step 4 and 6 to the svn directory from Step 5. Add and commit the files. This makes them available in the Apache staging repo.
 
@@ -56,3 +82,26 @@
 1. Go to http://repository.apache.org and drop the staging repository.
 
 2. Delete the staged distribution files from https://dist.apache.org/repos/dist/dev/storm/
+
+3. Delete the git tag.
+
+# How to vote on a release candidate
+
+We encourage everyone to review and vote on a release candidate to make an Apache Storm release more reliable and trustworthy.
+
+Below is a checklist that one could do to review a release candidate. 
+Please note this list is not exhaustive and only includes some of the common steps. Feel free to add your own tests.
+
+1. Verify files such as *.asc, *.sha512; some scripts are available under `dev-tools/rc` to help with it;
+2. Build Apache Storm source code and run unit tests, create an Apache Storm distribution;
+3. Set up a standalone cluster using apache-storm-xxx.zip, apache-storm-xxx.tar.gz, the Apache Storm distribution created from step 2, separately;
+4. Launch WordCountTopology and ThroughputVsLatency topology and check logs, UI metrics, etc;
+5. Test basic UI functionalities such as jstack, heap dump, deactivate, activate, rebalance, change log level, log search, kill topology;
+6. Test basic CLI such as kill, list, deactivate, deactivate, rebalance, etc.
+
+It's also preferable to set up a standalone secure Apache Storm cluster and test basic funcionalities on it.
+
+Don't feel the pressure to do everything listed above. After you finish your review, reply to the corresponding email thread with your vote, summarize the work you have performed and elaborate the issues
+you have found if any. Also please feel free to update the checklist if you think anything important is missing there. 
+
+Your contribution is very much appreciated.  
\ No newline at end of file
diff --git a/dev-tools/rc/verify-release-file.sh b/dev-tools/rc/verify-release-file.sh
index 2e33965..3e6a869 100755
--- a/dev-tools/rc/verify-release-file.sh
+++ b/dev-tools/rc/verify-release-file.sh
@@ -34,21 +34,6 @@
   echo 'Signature seems not correct'
 fi
 
-# checking MD5
-GPG_MD5_FILE="/tmp/${TARGET_FILE}_GPG.md5"
-gpg --print-md MD5 ${TARGET_FILE} > ${GPG_MD5_FILE}
-MD5_TARGET_FILE="${TARGET_FILE}.md5"
-
-echo ">> checking MD5 file... (${MD5_TARGET_FILE})"
-diff ${GPG_MD5_FILE} ${MD5_TARGET_FILE}
-
-if [ $? -eq 0 ];
-then
-  echo 'MD5 file is correct'
-else
-  echo 'MD5 file is not correct'
-fi
-
 # checking SHA
 GPG_SHA_FILE="/tmp/${TARGET_FILE}_GPG.sha512"
 gpg --print-md SHA512 ${TARGET_FILE} > ${GPG_SHA_FILE}
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 578be2b..fdd130b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,7 +21,6 @@
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -95,7 +94,8 @@
     private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
     private final Path localBaseDir;
     private final int blobDownloadRetries;
-    private final ScheduledExecutorService execService;
+    private final ScheduledExecutorService downloadExecService;
+    private final ScheduledExecutorService taskExecService;
     private final long cacheCleanupPeriod;
     private final StormMetricsRegistry metricsRegistry;
     // cleanup
@@ -120,13 +120,14 @@
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
 
-        // if we needed we could make config for update thread pool size
-        int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
         blobDownloadRetries = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
 
-        execService = Executors.newScheduledThreadPool(threadPoolSize,
-                                                       new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
+        int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+        downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
+        taskExecService = Executors.newScheduledThreadPool(3,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
         reconstructLocalizedResources();
 
         symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
@@ -213,7 +214,7 @@
             blobPending.compute(topologyId, (tid, old) -> {
                 CompletableFuture<Void> ret = old;
                 if (ret == null) {
-                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
                 } else {
                     try {
                         addReferencesToBlobs(pna, cb);
@@ -291,7 +292,7 @@
                     }
                 }
                 LOG.debug("FINISHED download of {}", blob);
-            }, execService);
+            }, downloadExecService);
             i++;
         }
         return CompletableFuture.allOf(all);
@@ -337,14 +338,15 @@
      * Start any background threads needed.  This includes updating blobs and cleaning up unused blobs over the configured size limit.
      */
     public void start() {
-        execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
         LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
-        execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+        taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void close() throws InterruptedException {
-        execService.shutdown();
+        downloadExecService.shutdown();
+        taskExecService.shutdown();
     }
 
     private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
@@ -450,15 +452,7 @@
             topoConfBlob.removeReference(pna);
         }
 
-        List<LocalResource> localResources;
-        try {
-            localResources = getLocalResources(pna);
-        } catch (FileNotFoundException e) {
-            LOG.warn("Local resources for {} no longer available", pna, e);
-            return;
-        }
-
-        for (LocalResource lr : localResources) {
+        for (LocalResource lr : getLocalResources(pna)) {
             try {
                 removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
             } catch (Exception e) {