Merge pull request #3163 from dandsager1/STORM-3534

STORM-3534 Add generic resources to UI
diff --git a/SECURITY.md b/SECURITY.md
index 668ea8a..5e321e5 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -28,6 +28,7 @@
 |--------------|--------------|------------------------|--------|
 | 2181 | `storm.zookeeper.port` | Nimbus, Supervisors, and Worker processes | ZooKeeper |
 | 6627 | `nimbus.thrift.port` | Storm clients, Supervisors, and UI | Nimbus |
+| 6628 | `supervisor.thrift.port` | Nimbus | Supervisors |
 | 8080 | `ui.port` | Client Web Browsers | UI |
 | 8000 | `logviewer.port` | Client Web Browsers | Logviewer |
 | 3772 | `drpc.port` | External DRPC Clients | DRPC |
diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md
index 4e4d0f1..f7f7b4f 100644
--- a/docs/ClusterMetrics.md
+++ b/docs/ClusterMetrics.md
@@ -185,6 +185,7 @@
 | supervisor:num-launched | meter | number of times the supervisor is launched. |
 | supervisor:num-shell-exceptions | meter | number of exceptions calling shell commands. |
 | supervisor:num-slots-used-gauge | gauge | number of slots used on the supervisor. |
+| supervisor:num-worker-start-timed-out | meter | number of times worker start timed out. |
 | supervisor:num-worker-transitions-into-empty | meter | number of transitions into empty state. |
 | supervisor:num-worker-transitions-into-kill | meter | number of transitions into kill state. |
 | supervisor:num-worker-transitions-into-kill-and-relaunch | meter | number of transitions into kill-and-relaunch state |
diff --git a/docs/Generic-resources.md b/docs/Generic-resources.md
new file mode 100644
index 0000000..f3bfe3e
--- /dev/null
+++ b/docs/Generic-resources.md
@@ -0,0 +1,39 @@
+---
+title: Generic Resources
+layout: documentation
+documentation: true
+---
+
+### Generic Resources
+Generic Resources allow Storm to reference arbitrary resource types. Generic Resources may be considered an extension of the resources enumerated by the [Resource Aware Scheduler](Resource_Aware_Scheduler_overview.html), which accounts for CPU and memory.
+
+### API Overview
+For a Storm Topology, the user can now specify the amount of generic resources a topology component (i.e. Spout or Bolt) is required to run a single instance of the component. The user can specify the resource requirement for a topology component by using the following API call.
+```
+   public T addResource(String resourceName, Number resourceValue)
+```
+Parameters:
+-   resourceName – The name of the generic resource
+-   resourceValue – The amount of the generic resource
+
+Example of Usage:
+```
+   SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
+   s1.addResouce("gpu.count", 1.0);
+```
+
+### Specifying Generic Cluster Resources
+
+A storm administrator can specify node resource availability by modifying the _conf/storm.yaml_ file located in the storm home directory of that node.
+```
+   supervisor.resources.map: {[type<String>] : [amount<Double>]}
+```
+Example of Usage:
+```
+   supervisor.resources.map: {"gpu.count" : 2.0}
+```
+
+
+### Generic Resources in UI
+
+![Storm Cluster UI](images/storm_ui.png)
diff --git a/docs/SECURITY.md b/docs/SECURITY.md
index 74a67ef..6e4d7e1 100644
--- a/docs/SECURITY.md
+++ b/docs/SECURITY.md
@@ -36,6 +36,7 @@
 |--------------|--------------|------------------------|--------|
 | 2181 | `storm.zookeeper.port` | Nimbus, Supervisors, and Worker processes | Zookeeper |
 | 6627 | `nimbus.thrift.port` | Storm clients, Supervisors, and UI | Nimbus |
+| 6628 | `supervisor.thrift.port` | Nimbus | Supervisors |
 | 8080 | `ui.port` | Client Web Browsers | UI |
 | 8000 | `logviewer.port` | Client Web Browsers | Logviewer |
 | 3772 | `drpc.port` | External DRPC Clients | DRPC |
diff --git a/docs/images/storm_ui.png b/docs/images/storm_ui.png
new file mode 100644
index 0000000..45aae41
--- /dev/null
+++ b/docs/images/storm_ui.png
Binary files differ
diff --git a/docs/index.md b/docs/index.md
index df1df93..36cf63f 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -65,6 +65,7 @@
 * [CGroup Enforcement](cgroups_in_storm.html)
 * [Pacemaker reduces load on zookeeper for large clusters](Pacemaker.html)
 * [Resource Aware Scheduler](Resource_Aware_Scheduler_overview.html)
+* [Generic Resources](Generic-resources.html)
 * [Daemon Metrics/Monitoring](ClusterMetrics.html)
 * [Windows users guide](windows-users-guide.html)
 * [Classpath handling](Classpath-handling.html)
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@
 import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.metric.api.IMetric;
 import org.slf4j.Logger;
@@ -76,8 +77,17 @@
         Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
         Set<TopicPartition> topicPartitions = offsetManagers.keySet();
 
-        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
-        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+        Map<TopicPartition, Long> beginningOffsets;
+        Map<TopicPartition, Long> endOffsets;
+
+        try {
+            beginningOffsets = consumer.beginningOffsets(topicPartitions);
+            endOffsets = consumer.endOffsets(topicPartitions);
+        } catch (RetriableException e) {
+            LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+            return null;
+        }
+
         //map to hold partition level and topic level metrics
         Map<String, Long> result = new HashMap<>();
 
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyListOf;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.HashSet;
 import java.util.List;
@@ -39,8 +36,10 @@
 import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
     }
+
+    @Test
+    public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        // Ensure a timeout exception results in the return value being null
+        when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertNull(offsetMetric);
+    }
 }
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 8299c14..50570e1 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -18,12 +18,17 @@
 
 package org.apache.storm.flux.parser;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.storm.flux.model.BoltDef;
 import org.apache.storm.flux.model.IncludeDef;
@@ -40,17 +45,20 @@
  */
 public class FluxParser {
     private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+    private static final Pattern propertyPattern =
+            Pattern.compile(".*\\$\\{(?<var>ENV-(?<envVar>.+)|(?<list>.+)\\[(?<listIndex>\\d+)]|.+)}.*");
 
     private FluxParser() {
     }
 
     /**
      * Parse a flux topology definition.
-     * @param inputFile source YAML file
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param inputFile       source YAML file
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topologuy definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -65,11 +73,12 @@
 
     /**
      * Parse a flux topology definition from a classpath resource..
-     * @param resource YAML resource
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param resource        YAML resource
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topologuy definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -84,11 +93,12 @@
 
     /**
      * Parse a flux topology definition.
-     * @param inputStream InputStream representation of YAML file
-     * @param dumpYaml if true, dump the parsed YAML to stdout
+     *
+     * @param inputStream     InputStream representation of YAML file
+     * @param dumpYaml        if true, dump the parsed YAML to stdout
      * @param processIncludes whether or not to process includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties      properties file for variable substitution
+     * @param envSub          whether or not to perform environment variable substitution
      * @return resulting topology definition
      * @throws IOException if there is a problem reading file(s)
      */
@@ -116,10 +126,11 @@
 
     /**
      * Parse filter properties file.
+     *
      * @param propertiesFile properties file for variable substitution
-     * @param resource whether or not to load properties file from classpath
+     * @param resource       whether or not to load properties file from classpath
      * @return resulting filter properties
-     * @throws IOException  if there is a problem reading file
+     * @throws IOException if there is a problem reading file
      */
     public static Properties parseProperties(String propertiesFile, boolean resource) throws IOException {
         Properties properties = null;
@@ -140,36 +151,43 @@
     }
 
     private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties, boolean envSubstitution) throws IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         LOG.info("loading YAML from input stream...");
-        int b = -1;
-        while ((b = in.read()) != -1) {
-            bos.write(b);
-        }
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+            String conf = reader.lines().map(line -> {
+                Matcher m = propertyPattern.matcher(line);
+                return m.find()
+                        ? getPropertyReplacement(properties, m, envSubstitution)
+                        .map(propValue -> line.replace("${" + m.group("var") + "}", propValue))
+                        .orElseGet(() -> {
+                            LOG.warn("Could not find replacement for property: " + m.group("var"));
+                            return line;
+                        })
+                        : line;
+            }).collect(Collectors.joining(System.lineSeparator()));
 
-        // TODO substitution implementation is not exactly efficient or kind to memory...
-        String str = bos.toString();
-        // properties file substitution
-        if (properties != null) {
-            LOG.info("Performing property substitution.");
-            for (Object key : properties.keySet()) {
-                str = str.replace("${" + key + "}", properties.getProperty((String)key));
-            }
-        } else {
-            LOG.info("Not performing property substitution.");
+            return (TopologyDef) yaml.load(conf);
         }
+    }
 
-        // environment variable substitution
-        if (envSubstitution) {
-            LOG.info("Performing environment variable substitution...");
-            Map<String, String> envs = System.getenv();
-            for (String key : envs.keySet()) {
-                str = str.replace("${ENV-" + key + "}", envs.get(key));
-            }
+    private static Optional<String> getPropertyReplacement(Properties properties, Matcher match, boolean envSubstitution) {
+        if (match.group("listIndex") != null) {
+            String prop = properties.getProperty(match.group("list"));
+            return Optional.of(parseListAndExtractElem(prop, match.group("listIndex")));
+        } else if (envSubstitution && match.group("envVar") != null) {
+            String envVar = System.getenv().get(match.group("envVar"));
+            return Optional.ofNullable(envVar);
         } else {
-            LOG.info("Not performing environment variable substitution.");
+            return Optional.ofNullable(properties.getProperty(match.group("var")));
         }
-        return (TopologyDef) yaml.load(str);
+    }
+
+    private static String parseListAndExtractElem(String strList, String index) {
+        String[] listProp = strList.substring(1, strList.length() - 1).split(",");
+        String listElem = listProp[Integer.parseInt(index)];
+
+        // remove whitespaces and double quotes from beginning and end of a given string
+        String trimmed = listElem.trim();
+        return trimmed.substring(1, trimmed.length() - 1);
     }
 
     private static void dumpYaml(TopologyDef topology, Yaml yaml) {
@@ -191,14 +209,15 @@
 
     /**
      * Process includes contained within a yaml file.
+     *
      * @param yaml        the yaml parser for parsing the include file(s)
      * @param topologyDef the topology definition containing (possibly zero) includes
-     * @param properties properties file for variable substitution
-     * @param envSub whether or not to perform environment variable substitution
+     * @param properties  properties file for variable substitution
+     * @param envSub      whether or not to perform environment variable substitution
      * @return The TopologyDef with includes resolved.
      */
     private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub)
-        throws IOException {
+            throws IOException {
         //TODO support multiple levels of includes
         if (topologyDef.getIncludes() != null) {
             for (IncludeDef include : topologyDef.getIncludes()) {
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 90613c9..275a720 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -275,6 +275,11 @@
                Collections.singletonList("A string list"),
                is(context.getTopologyDef().getConfig().get("list.property.target")));
 
+        //Test substitution where the target type is a List element
+        assertThat("List element property is not replaced by the expected value",
+                "A string list",
+                is(context.getTopologyDef().getConfig().get("list.element.property.target")));
+
     }
     
     @Test
diff --git a/flux/flux-core/src/test/resources/configs/substitution-test.yaml b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
index 9707936..67ac92a 100644
--- a/flux/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -45,6 +45,8 @@
   test.env.value: "${ENV-PATH}"
   # test variable substitution for list type
   list.property.target: ${a.list.property}
+  # test variable substitution for list element
+  list.element.property.target: ${a.list.property[0]}
 
 # spout definitions
 spouts:
diff --git a/pom.xml b/pom.xml
index 0601a84..4ff5218 100644
--- a/pom.xml
+++ b/pom.xml
@@ -236,6 +236,15 @@
             </roles>
             <timezone>-6</timezone>
         </developer>
+        <developer>
+            <id>agresch</id>
+            <name>Aaron Gresch</name>
+            <email>agresch@gmail.com</email>
+            <roles>
+                <role>Committer</role>
+            </roles>
+            <timezone>-6</timezone>
+        </developer>
 
     </developers>
 
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 57af8d1..7a1c518 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -55,5 +55,7 @@
     public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
     public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
     public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
+
+    public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
 }
     
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index e6a12d9..4713e95 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -16,6 +16,7 @@
 import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -345,7 +346,7 @@
 
     @Override
     public Map<String, Object> getComponentConfiguration() {
-        return bolt.getComponentConfiguration();
+        return bolt.getComponentConfiguration() != null ? bolt.getComponentConfiguration() : Collections.emptyMap();
     }
 
     protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 5162cc6..0e38838 100644
--- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -39,6 +39,7 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -214,6 +215,14 @@
         Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
     }
 
+    @Test
+    public void testEmptyConfigOnWrappedBolt() {
+        IWindowedBolt wrappedBolt = Mockito.mock(IWindowedBolt.class);
+        Mockito.when(wrappedBolt.getComponentConfiguration()).thenReturn(null);
+        executor = new WindowedBoltExecutor(wrappedBolt);
+        assertTrue("Configuration is not empty", executor.getComponentConfiguration().isEmpty());
+    }
+
     private static class TestWindowedBolt extends BaseWindowedBolt {
         List<TupleWindow> tupleWindows = new ArrayList<>();
 
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 957d27b..6ab3af5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -268,6 +268,7 @@
     private final Meter shutdownCalls;
     private final Meter processWorkerMetricsCalls;
     private final Meter mkAssignmentsErrors;
+    private final Meter sendAssignmentExceptions;   // used in AssignmentDistributionService.java
 
     //Timer
     private final Timer fileUploadDuration;
@@ -305,7 +306,7 @@
         IStormClusterState state = nimbus.getStormClusterState();
         Assignment oldAssignment = state.assignmentInfo(topoId, null);
         state.removeStorm(topoId);
-        notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
+        notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer(), nimbus.getMetricsRegistry());
         nimbus.heartbeatsCache.removeTopo(topoId);
         nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
         return null;
@@ -517,6 +518,7 @@
         this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
         this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
         this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
+        this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
         this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
         this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
         this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
@@ -1578,7 +1580,8 @@
      */
     private static void notifySupervisorsAssignments(Map<String, Assignment> assignments,
                                                      AssignmentDistributionService service, Map<String, String> nodeHost,
-                                                     Map<String, SupervisorDetails> supervisorDetails) {
+                                                     Map<String, SupervisorDetails> supervisorDetails,
+                                                     StormMetricsRegistry metricsRegistry) {
         for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
             try {
                 String nodeId = nodeEntry.getKey();
@@ -1586,7 +1589,7 @@
                 supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
                 SupervisorDetails details = supervisorDetails.get(nodeId);
                 Integer serverPort = details != null ? details.getServerPort() : null;
-                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments);
+                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
             } catch (Throwable tr1) {
                 //just skip when any error happens wait for next round assignments reassign
                 LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey());
@@ -1595,10 +1598,10 @@
     }
 
     private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss,
-                                                  AssignmentDistributionService service) {
+                                                  AssignmentDistributionService service, StormMetricsRegistry metricsRegistry) {
         Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
         notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost,
-                                     basicSupervisorDetailsMap(clusterState));
+                                     basicSupervisorDetailsMap(clusterState), metricsRegistry);
     }
 
     @VisibleForTesting
@@ -1654,6 +1657,10 @@
         return assignmentsDistributer;
     }
 
+    private StormMetricsRegistry getMetricsRegistry() {
+        return metricsRegistry;
+    }
+
     @VisibleForTesting
     public HeartbeatCache getHeartbeatsCache() {
         return heartbeatsCache;
@@ -2520,7 +2527,7 @@
                 totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
             }
             notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
-                    basicSupervisorDetailsMap);
+                                        basicSupervisorDetailsMap, getMetricsRegistry());
 
             Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
             for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 7575a91..df419b9 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -684,6 +684,7 @@
         long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
         long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState);
         if (timeDiffms > hbFirstTimeoutMs) {
+            staticState.slotMetrics.numWorkerStartTimedOut.mark();
             LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
                     hbFirstTimeoutMs);
             return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java
index f8e13fd..8b2f5f1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java
@@ -26,6 +26,7 @@
 class SlotMetrics {
 
     final Meter numWorkersLaunched;
+    final Meter numWorkerStartTimedOut;
     final Map<Slot.KillReason, Meter> numWorkersKilledFor;
     final Timer workerLaunchDuration;
     final Map<Slot.MachineState, Meter> transitionIntoState;
@@ -34,6 +35,7 @@
 
     SlotMetrics(StormMetricsRegistry metricsRegistry) {
         numWorkersLaunched = metricsRegistry.registerMeter("supervisor:num-workers-launched");
+        numWorkerStartTimedOut = metricsRegistry.registerMeter("supervisor:num-worker-start-timed-out");
         numWorkersKilledFor = Collections.unmodifiableMap(EnumUtil.toEnumMap(Slot.KillReason.class,
             killReason -> metricsRegistry.registerMeter("supervisor:num-workers-killed-" + killReason.toString())));
         workerLaunchDuration = metricsRegistry.registerTimer("supervisor:worker-launch-duration");
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 578be2b..fdd130b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,7 +21,6 @@
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -95,7 +94,8 @@
     private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
     private final Path localBaseDir;
     private final int blobDownloadRetries;
-    private final ScheduledExecutorService execService;
+    private final ScheduledExecutorService downloadExecService;
+    private final ScheduledExecutorService taskExecService;
     private final long cacheCleanupPeriod;
     private final StormMetricsRegistry metricsRegistry;
     // cleanup
@@ -120,13 +120,14 @@
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
 
-        // if we needed we could make config for update thread pool size
-        int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
         blobDownloadRetries = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
 
-        execService = Executors.newScheduledThreadPool(threadPoolSize,
-                                                       new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
+        int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+        downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
+        taskExecService = Executors.newScheduledThreadPool(3,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
         reconstructLocalizedResources();
 
         symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
@@ -213,7 +214,7 @@
             blobPending.compute(topologyId, (tid, old) -> {
                 CompletableFuture<Void> ret = old;
                 if (ret == null) {
-                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
                 } else {
                     try {
                         addReferencesToBlobs(pna, cb);
@@ -291,7 +292,7 @@
                     }
                 }
                 LOG.debug("FINISHED download of {}", blob);
-            }, execService);
+            }, downloadExecService);
             i++;
         }
         return CompletableFuture.allOf(all);
@@ -337,14 +338,15 @@
      * Start any background threads needed.  This includes updating blobs and cleaning up unused blobs over the configured size limit.
      */
     public void start() {
-        execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
         LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
-        execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+        taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void close() throws InterruptedException {
-        execService.shutdown();
+        downloadExecService.shutdown();
+        taskExecService.shutdown();
     }
 
     private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
@@ -450,15 +452,7 @@
             topoConfBlob.removeReference(pna);
         }
 
-        List<LocalResource> localResources;
-        try {
-            localResources = getLocalResources(pna);
-        } catch (FileNotFoundException e) {
-            LOG.warn("Local resources for {} no longer available", pna, e);
-            return;
-        }
-
-        for (LocalResource lr : localResources) {
+        for (LocalResource lr : getLocalResources(pna)) {
             try {
                 removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
             } catch (Exception e) {
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index cc98804..5db347e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -68,6 +68,10 @@
         registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
     }
 
+    public Meter getMeter(String meterName) {
+        return registry.getMeters().get(meterName);
+    }
+
     public void startMetricsReporters(Map<String, Object> daemonConf) {
         reporters = MetricsUtils.getPreparableReporters(daemonConf);
         for (PreparableReporter reporter : reporters) {
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4f84997..4eb1bb4 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -21,9 +21,12 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
@@ -146,7 +149,8 @@
      * @param serverPort node thrift server port.
      * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
      */
-    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+                                      StormMetricsRegistry metricsRegistry) {
         try {
             //For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
             //Just skip for this scheduling round.
@@ -155,7 +159,8 @@
                 return;
             }
 
-            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort,
+                                                assignments, metricsRegistry), 5L, TimeUnit.SECONDS);
             if (!success) {
                 LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
             }
@@ -211,17 +216,20 @@
         private String host;
         private Integer serverPort;
         private SupervisorAssignments assignments;
+        private StormMetricsRegistry metricsRegistry;
 
-        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+                                StormMetricsRegistry metricsRegistry) {
             this.node = node;
             this.host = host;
             this.serverPort = serverPort;
             this.assignments = assignments;
+            this.metricsRegistry = metricsRegistry;
         }
 
         public static NodeAssignments getInstance(String node, String host, Integer serverPort,
-                                                  SupervisorAssignments assignments) {
-            return new NodeAssignments(node, host, serverPort, assignments);
+                                                  SupervisorAssignments assignments, StormMetricsRegistry metricsRegistry) {
+            return new NodeAssignments(node, host, serverPort, assignments, metricsRegistry);
         }
 
         //supervisor assignment id/supervisor id
@@ -241,6 +249,9 @@
             return this.assignments;
         }
 
+        public StormMetricsRegistry getMetricsRegistry() {
+            return metricsRegistry;
+        }
     }
 
     /**
@@ -289,14 +300,13 @@
                     try {
                         client.getIface().sendSupervisorAssignments(assignments.getAssignments());
                     } catch (Exception e) {
-                        //just ignore the exception.
+                        assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
                         LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
                     }
                 } catch (Throwable e) {
                     //just ignore any error/exception.
                     LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
                 }
-
             }
         }
     }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index cba27cd..75ae8c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -26,7 +26,6 @@
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
@@ -36,7 +35,6 @@
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
@@ -374,22 +372,29 @@
         }
 
         String getRemainingRequiredResourcesMessage() {
+            StringBuilder message = new StringBuilder();
+            message.append("After evicting lower priority topologies: ");
+
+            NormalizedResourceOffer clusterRemainingAvailableResources = new NormalizedResourceOffer();
+            clusterRemainingAvailableResources.add(clusterAvailableResources);
+            clusterRemainingAvailableResources.remove(topologyScheduledResources);
+
             double memoryNeeded = remainingRequiredTopologyMemory;
             double cpuNeeded = remainingRequiredTopologyResources.getTotalCpu();
-            StringBuilder message = new StringBuilder();
-            if (memoryNeeded > 0 || cpuNeeded > 0) {
-                if (memoryNeeded > 0) {
-                    message.append(memoryNeeded).append(" MB ");
-                }
-                if (cpuNeeded > 0) {
-                    message.append(cpuNeeded).append("% CPU ");
-                }
-                if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
-                    message.append(" ");
-                    message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
-                    message.append(" ");
-                }
-                message.append("needed even after evicting lower priority topologies. ");
+            if (memoryNeeded > 0) {
+                message.append("Additional Memory Required: ").append(memoryNeeded).append(" MB ");
+                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalMemoryMb()).append(" MB). ");
+            }
+            if (cpuNeeded > 0) {
+                message.append("Additional CPU Required: ").append(cpuNeeded).append("% CPU ");
+                message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalCpu()).append(" % CPU).");
+            }
+            if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
+                message.append(" Additional Topology Required Resources: ");
+                message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
+                message.append(" Cluster Available Resources: ");
+                message.append(clusterRemainingAvailableResources.getNormalizedResources().toString());
+                message.append(".  ");
             }
             return message.toString();
         }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4e5ec5c..2b5afe0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -126,7 +126,7 @@
         } else {
             String comp = td.getExecutorToComponent().get(exec);
             NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-            LOG.error("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
+            LOG.warn("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
             return false;
         }
     }