STORM-3648 add meter to track worker heartbeat rate
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 8df1a45..7f6ba47 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -327,3 +327,8 @@
* `uptimeSecs` reports the number of seconds the worker has been up for
* `newWorkerEvent` is 1 when a worker is first started and 0 all other times. This can be used to tell when a worker has crashed and is restarted.
* `startTimeSecs` is when the worker started in seconds since the epoch
+
+##### doHeartbeat-calls
+
+* `doHeartbeat-calls` is a meter that indicates the rate the worker is performing heartbeats.
+
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index b4e4c80..ee8e005 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -12,6 +12,7 @@
package org.apache.storm.daemon.worker;
+import com.codahale.metrics.Meter;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -83,6 +84,7 @@
private final String workerId;
private final LogConfigManager logConfigManager;
private final StormMetricRegistry metricRegistry;
+ private Meter heatbeatMeter;
private WorkerState workerState;
private AtomicReference<List<IRunningExecutor>> executorsAtom;
@@ -196,6 +198,8 @@
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
topologyConf, stateStorage, stormClusterState, autoCreds, metricRegistry);
+ this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
+ Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
@@ -362,6 +366,7 @@
state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
// it shouldn't take supervisor 120 seconds between listing dir and reading it
heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+ this.heatbeatMeter.mark();
}
public void doExecutorHeartbeats() {
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 4d62cf4..d095cc0 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -81,6 +81,13 @@
return meter;
}
+ public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId) {
+ MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, taskId, context.getThisWorkerPort());
+ Meter meter = registry.meter(metricNames.getLongName());
+ saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
+ return meter;
+ }
+
public Meter meter(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
Meter meter = registry.meter(metricNames.getLongName());