Merge pull request #3185 from agresch/agresch_storm_3557

STORM-3557 allow health checks to pass on timeout and add meter to track
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 7a1c518..af033ee 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -57,5 +57,6 @@
     public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
 
     public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
+    public static final String SUPERVISOR_HEALTH_CHECK_TIMEOUTS = "supervisor:health-check-timeouts";
 }
     
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index ce7f53e..5a35fd3 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -20,7 +20,7 @@
 
     public static void main(String[] args) {
         Map<String, Object> conf = Utils.readStormConfig();
-        System.exit(HealthChecker.healthCheck(conf));
+        System.exit(HealthChecker.healthCheck(conf, null));
     }
 
 }
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 28076f4..482846d 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -171,6 +171,12 @@
     public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = "storm.health.check.timeout.ms";
 
     /**
+     * Boolean setting to configure if health checks should fail when timeouts occur or not.
+     */
+    @IsBoolean
+    public static final String STORM_HEALTH_CHECK_FAIL_ON_TIMEOUTS = "storm.health.check.fail.on.timeouts";
+
+    /**
      * This is the user that the Nimbus daemon process is running as. May be used when security is enabled to authorize actions in the
      * cluster.
      */
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 5a4d25f..ce597de 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -35,6 +35,7 @@
 
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.StormTimer;
 import org.apache.storm.cluster.ClusterStateContext;
@@ -215,7 +216,7 @@
         return sharedContext;
     }
 
-    StormMetricsRegistry getMetricsRegistry() {
+    public StormMetricsRegistry getMetricsRegistry() {
         return metricsRegistry;
     }
     
@@ -342,6 +343,7 @@
             //This will only get updated once
             metricsRegistry.registerMeter("supervisor:num-launched").mark();
             metricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
+            metricsRegistry.registerMeter(Constants.SUPERVISOR_HEALTH_CHECK_TIMEOUTS);
             killErrorMeter = metricsRegistry.registerMeter("supervisor:num-kill-worker-errors");
             metricsRegistry.startMetricsReporters(conf);
             Utils.addShutdownHookWithForceKillIn1Sec(() -> {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index f68cb8c..4a20084 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -36,7 +36,7 @@
     public void run() {
         Map<String, Object> conf = supervisor.getConf();
         LOG.info("Running supervisor healthchecks...");
-        int healthCode = HealthChecker.healthCheck(conf);
+        int healthCode = HealthChecker.healthCheck(conf, supervisor.getMetricsRegistry());
         if (healthCode != 0) {
             LOG.info("The supervisor healthchecks FAILED...");
             supervisor.shutdownAllWorkers(null, null);
diff --git a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
index b5f3655..8896fd4 100644
--- a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
+++ b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.healthcheck;
 
+import com.codahale.metrics.Meter;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStream;
@@ -26,7 +28,10 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.slf4j.Logger;
@@ -40,7 +45,7 @@
     private static final String TIMEOUT = "timeout";
     private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
 
-    public static int healthCheck(Map<String, Object> conf) {
+    public static int healthCheck(Map<String, Object> conf, StormMetricsRegistry metricRegistry) {
         String healthDir = ServerConfigUtils.absoluteHealthCheckDir(conf);
         List<String> results = new ArrayList<>();
         if (healthDir != null) {
@@ -66,10 +71,23 @@
         // to execute properly, not that the system is unhealthy, in which case
         // we don't want to start killing things.
 
-        if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)
-            || results.contains(TIMEOUT)) {
+        if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)) {
             LOG.warn("The supervisor healthchecks failed!!!");
             return 1;
+        } else if (results.contains(TIMEOUT)) {
+            LOG.warn("The supervisor healthchecks timedout!!!");
+            if (metricRegistry != null) {
+                Meter timeoutMeter = metricRegistry.getMeter(Constants.SUPERVISOR_HEALTH_CHECK_TIMEOUTS);
+                if (timeoutMeter != null) {
+                    timeoutMeter.mark();
+                }
+            }
+            Boolean failOnTimeouts = ObjectReader.getBoolean(conf.get(DaemonConfig.STORM_HEALTH_CHECK_FAIL_ON_TIMEOUTS), true);
+            if (failOnTimeouts) {
+                return 1;
+            } else {
+                return 0;
+            }
         } else {
             LOG.info("The supervisor healthchecks succeeded.");
             return 0;