Merge pull request #3195 from RuiLi8080/STORM-3567

[STORM-3567] fix UI showing wrong resource info when topo is not scheduled
diff --git a/.travis.yml b/.travis.yml
index 1dcdc0e..6e46704 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -43,8 +43,8 @@
   - sudo add-apt-repository ppa:deadsnakes/ppa -y
   - sudo apt-get update
   - sudo apt-get install python3.6
-  - export MVN_HOME=$HOME/apache-maven-3.6.1
-  - if [ ! -d $MVN_HOME/bin ]; then wget https://archive.apache.org/dist/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz -P $HOME; tar xzvf $HOME/apache-maven-3.6.1-bin.tar.gz -C $HOME; fi
+  - export MVN_HOME=$HOME/apache-maven-3.6.3
+  - if [ ! -d $MVN_HOME/bin ]; then wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz -P $HOME; tar xzvf $HOME/apache-maven-3.6.3-bin.tar.gz -C $HOME; fi
   - export PATH=$MVN_HOME/bin:$PATH
 install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd`
 script:
@@ -54,4 +54,4 @@
     - "$HOME/.m2/repository"
     - "$HOME/.rvm"
     - "$NVM_DIR"
-    - "$HOME/apache-maven-3.6.1"
+    - "$HOME/apache-maven-3.6.3"
diff --git a/docs/storm-eventhubs.md b/docs/storm-eventhubs.md
index df46755..623a1fc 100644
--- a/docs/storm-eventhubs.md
+++ b/docs/storm-eventhubs.md
@@ -1,5 +1,5 @@
 ---
-title: Azue Event Hubs Integration
+title: Azure Event Hubs Integration
 layout: documentation
 documentation: true
 ---
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 7a1c518..af033ee 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -57,5 +57,6 @@
     public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
 
     public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
+    public static final String SUPERVISOR_HEALTH_CHECK_TIMEOUTS = "supervisor:health-check-timeouts";
 }
     
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index ce7f53e..5a35fd3 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -20,7 +20,7 @@
 
     public static void main(String[] args) {
         Map<String, Object> conf = Utils.readStormConfig();
-        System.exit(HealthChecker.healthCheck(conf));
+        System.exit(HealthChecker.healthCheck(conf, null));
     }
 
 }
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 28076f4..482846d 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -171,6 +171,12 @@
     public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = "storm.health.check.timeout.ms";
 
     /**
+     * Boolean setting to configure if health checks should fail when timeouts occur or not.
+     */
+    @IsBoolean
+    public static final String STORM_HEALTH_CHECK_FAIL_ON_TIMEOUTS = "storm.health.check.fail.on.timeouts";
+
+    /**
      * This is the user that the Nimbus daemon process is running as. May be used when security is enabled to authorize actions in the
      * cluster.
      */
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 259a13e..ce597de 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.codahale.metrics.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -31,8 +32,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.StormTimer;
 import org.apache.storm.cluster.ClusterStateContext;
@@ -105,6 +108,7 @@
     private final ExecutorService heartbeatExecutor;
     private final AsyncLocalizer asyncLocalizer;
     private final StormMetricsRegistry metricsRegistry;
+    private Meter killErrorMeter;
     private final ContainerMemoryTracker containerMemoryTracker;
     private final SlotMetrics slotMetrics;
     private volatile boolean active;
@@ -212,7 +216,7 @@
         return sharedContext;
     }
 
-    StormMetricsRegistry getMetricsRegistry() {
+    public StormMetricsRegistry getMetricsRegistry() {
         return metricsRegistry;
     }
     
@@ -339,6 +343,8 @@
             //This will only get updated once
             metricsRegistry.registerMeter("supervisor:num-launched").mark();
             metricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
+            metricsRegistry.registerMeter(Constants.SUPERVISOR_HEALTH_CHECK_TIMEOUTS);
+            killErrorMeter = metricsRegistry.registerMeter("supervisor:num-kill-worker-errors");
             metricsRegistry.startMetricsReporters(conf);
             Utils.addShutdownHookWithForceKillIn1Sec(() -> {
                 metricsRegistry.stopMetricsReporters();
@@ -528,6 +534,9 @@
                 long start = Time.currentTimeMillis();
                 while (!k.areAllProcessesDead()) {
                     if ((Time.currentTimeMillis() - start) > 10_000) {
+                        if (killErrorMeter != null) {
+                            killErrorMeter.mark();
+                        }
                         throw new RuntimeException("Giving up on killing " + k
                                                    + " after " + (Time.currentTimeMillis() - start) + " ms");
                     }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index f68cb8c..4a20084 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -36,7 +36,7 @@
     public void run() {
         Map<String, Object> conf = supervisor.getConf();
         LOG.info("Running supervisor healthchecks...");
-        int healthCode = HealthChecker.healthCheck(conf);
+        int healthCode = HealthChecker.healthCheck(conf, supervisor.getMetricsRegistry());
         if (healthCode != 0) {
             LOG.info("The supervisor healthchecks FAILED...");
             supervisor.shutdownAllWorkers(null, null);
diff --git a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
index b5f3655..8896fd4 100644
--- a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
+++ b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.healthcheck;
 
+import com.codahale.metrics.Meter;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStream;
@@ -26,7 +28,10 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.slf4j.Logger;
@@ -40,7 +45,7 @@
     private static final String TIMEOUT = "timeout";
     private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
 
-    public static int healthCheck(Map<String, Object> conf) {
+    public static int healthCheck(Map<String, Object> conf, StormMetricsRegistry metricRegistry) {
         String healthDir = ServerConfigUtils.absoluteHealthCheckDir(conf);
         List<String> results = new ArrayList<>();
         if (healthDir != null) {
@@ -66,10 +71,23 @@
         // to execute properly, not that the system is unhealthy, in which case
         // we don't want to start killing things.
 
-        if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)
-            || results.contains(TIMEOUT)) {
+        if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)) {
             LOG.warn("The supervisor healthchecks failed!!!");
             return 1;
+        } else if (results.contains(TIMEOUT)) {
+            LOG.warn("The supervisor healthchecks timedout!!!");
+            if (metricRegistry != null) {
+                Meter timeoutMeter = metricRegistry.getMeter(Constants.SUPERVISOR_HEALTH_CHECK_TIMEOUTS);
+                if (timeoutMeter != null) {
+                    timeoutMeter.mark();
+                }
+            }
+            Boolean failOnTimeouts = ObjectReader.getBoolean(conf.get(DaemonConfig.STORM_HEALTH_CHECK_FAIL_ON_TIMEOUTS), true);
+            if (failOnTimeouts) {
+                return 1;
+            } else {
+                return 0;
+            }
         } else {
             LOG.info("The supervisor healthchecks succeeded.");
             return 0;