STORM-3648 add meter to track worker heartbeat rate
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 8df1a45..7f6ba47 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -327,3 +327,8 @@
 * `uptimeSecs` reports the number of seconds the worker has been up for
 * `newWorkerEvent` is 1 when a worker is first started and 0 all other times.  This can be used to tell when a worker has crashed and is restarted.
 * `startTimeSecs` is when the worker started in seconds since the epoch
+
+##### doHeartbeat-calls
+
+* `doHeartbeat-calls` is a meter that indicates the rate the worker is performing heartbeats.
+
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index b4e4c80..ee8e005 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.daemon.worker;
 
+import com.codahale.metrics.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -83,6 +84,7 @@
     private final String workerId;
     private final LogConfigManager logConfigManager;
     private final StormMetricRegistry metricRegistry;
+    private Meter heatbeatMeter;
 
     private WorkerState workerState;
     private AtomicReference<List<IRunningExecutor>> executorsAtom;
@@ -196,6 +198,8 @@
         throws Exception {
         workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
                                       topologyConf, stateStorage, stormClusterState, autoCreds, metricRegistry);
+        this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
+                Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
 
         // Heartbeat here so that worker process dies if this fails
         // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
@@ -362,6 +366,7 @@
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and reading it
         heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+        this.heatbeatMeter.mark();
     }
 
     public void doExecutorHeartbeats() {
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 4d62cf4..d095cc0 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -81,6 +81,13 @@
         return meter;
     }
 
+    public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId) {
+        MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, taskId, context.getThisWorkerPort());
+        Meter meter = registry.meter(metricNames.getLongName());
+        saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
+        return meter;
+    }
+
     public Meter meter(String name, TopologyContext context) {
         MetricNames metricNames = topologyMetricName(name, context);
         Meter meter = registry.meter(metricNames.getLongName());