STORM-3555 add meter to track errors killing workers
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 259a13e..5a4d25f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.codahale.metrics.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -31,6 +32,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
@@ -105,6 +107,7 @@
     private final ExecutorService heartbeatExecutor;
     private final AsyncLocalizer asyncLocalizer;
     private final StormMetricsRegistry metricsRegistry;
+    private Meter killErrorMeter;
     private final ContainerMemoryTracker containerMemoryTracker;
     private final SlotMetrics slotMetrics;
     private volatile boolean active;
@@ -339,6 +342,7 @@
             //This will only get updated once
             metricsRegistry.registerMeter("supervisor:num-launched").mark();
             metricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
+            killErrorMeter = metricsRegistry.registerMeter("supervisor:num-kill-worker-errors");
             metricsRegistry.startMetricsReporters(conf);
             Utils.addShutdownHookWithForceKillIn1Sec(() -> {
                 metricsRegistry.stopMetricsReporters();
@@ -528,6 +532,9 @@
                 long start = Time.currentTimeMillis();
                 while (!k.areAllProcessesDead()) {
                     if ((Time.currentTimeMillis() - start) > 10_000) {
+                        if (killErrorMeter != null) {
+                            killErrorMeter.mark();
+                        }
                         throw new RuntimeException("Giving up on killing " + k
                                                    + " after " + (Time.currentTimeMillis() - start) + " ms");
                     }