Merge pull request #3185 from agresch/agresch_storm_3557
STORM-3557 allow health checks to pass on timeout and add meter to track
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 7a1c518..af033ee 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -57,5 +57,6 @@
public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
+ public static final String SUPERVISOR_HEALTH_CHECK_TIMEOUTS = "supervisor:health-check-timeouts";
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
index ce7f53e..5a35fd3 100644
--- a/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/command/HealthCheck.java
@@ -20,7 +20,7 @@
public static void main(String[] args) {
Map<String, Object> conf = Utils.readStormConfig();
- System.exit(HealthChecker.healthCheck(conf));
+ System.exit(HealthChecker.healthCheck(conf, null));
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 28076f4..482846d 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -171,6 +171,12 @@
public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = "storm.health.check.timeout.ms";
/**
+ * Boolean setting to configure if health checks should fail when timeouts occur or not.
+ */
+ @IsBoolean
+ public static final String STORM_HEALTH_CHECK_FAIL_ON_TIMEOUTS = "storm.health.check.fail.on.timeouts";
+
+ /**
* This is the user that the Nimbus daemon process is running as. May be used when security is enabled to authorize actions in the
* cluster.
*/
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 5a4d25f..ce597de 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -35,6 +35,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
+import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.ClusterStateContext;
@@ -215,7 +216,7 @@
return sharedContext;
}
- StormMetricsRegistry getMetricsRegistry() {
+ public StormMetricsRegistry getMetricsRegistry() {
return metricsRegistry;
}
@@ -342,6 +343,7 @@
//This will only get updated once
metricsRegistry.registerMeter("supervisor:num-launched").mark();
metricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
+ metricsRegistry.registerMeter(Constants.SUPERVISOR_HEALTH_CHECK_TIMEOUTS);
killErrorMeter = metricsRegistry.registerMeter("supervisor:num-kill-worker-errors");
metricsRegistry.startMetricsReporters(conf);
Utils.addShutdownHookWithForceKillIn1Sec(() -> {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index f68cb8c..4a20084 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -36,7 +36,7 @@
public void run() {
Map<String, Object> conf = supervisor.getConf();
LOG.info("Running supervisor healthchecks...");
- int healthCode = HealthChecker.healthCheck(conf);
+ int healthCode = HealthChecker.healthCheck(conf, supervisor.getMetricsRegistry());
if (healthCode != 0) {
LOG.info("The supervisor healthchecks FAILED...");
supervisor.shutdownAllWorkers(null, null);
diff --git a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
index b5f3655..8896fd4 100644
--- a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
+++ b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
@@ -18,6 +18,8 @@
package org.apache.storm.healthcheck;
+import com.codahale.metrics.Meter;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
@@ -26,7 +28,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+
+import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.slf4j.Logger;
@@ -40,7 +45,7 @@
private static final String TIMEOUT = "timeout";
private static final String FAILED_WITH_EXIT_CODE = "failed_with_exit_code";
- public static int healthCheck(Map<String, Object> conf) {
+ public static int healthCheck(Map<String, Object> conf, StormMetricsRegistry metricRegistry) {
String healthDir = ServerConfigUtils.absoluteHealthCheckDir(conf);
List<String> results = new ArrayList<>();
if (healthDir != null) {
@@ -66,10 +71,23 @@
// to execute properly, not that the system is unhealthy, in which case
// we don't want to start killing things.
- if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)
- || results.contains(TIMEOUT)) {
+ if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)) {
LOG.warn("The supervisor healthchecks failed!!!");
return 1;
+ } else if (results.contains(TIMEOUT)) {
+ LOG.warn("The supervisor healthchecks timedout!!!");
+ if (metricRegistry != null) {
+ Meter timeoutMeter = metricRegistry.getMeter(Constants.SUPERVISOR_HEALTH_CHECK_TIMEOUTS);
+ if (timeoutMeter != null) {
+ timeoutMeter.mark();
+ }
+ }
+ Boolean failOnTimeouts = ObjectReader.getBoolean(conf.get(DaemonConfig.STORM_HEALTH_CHECK_FAIL_ON_TIMEOUTS), true);
+ if (failOnTimeouts) {
+ return 1;
+ } else {
+ return 0;
+ }
} else {
LOG.info("The supervisor healthchecks succeeded.");
return 0;