STORM-3714 add rate tracking to TaskMetrics (#3350)

diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
new file mode 100644
index 0000000..77b6720
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+
+/**
+ * A Counter metric that also implements a Gauge to report the average rate of events per second over 1 minute.  This class
+ * was added as a compromise to using a Meter, which has a much larger performance impact.
+ */
+public class RateCounter implements Gauge<Double> {
+    private Counter counter;
+    private double currentRate = 0;
+    private int time = 0;
+    private final long[] values;
+    private final int timeSpanInSeconds;
+
+    RateCounter(StormMetricRegistry metricRegistry, String metricName, String topologyId,
+                String componentId, int taskId, int workerPort, String streamId) {
+        counter = metricRegistry.counter(metricName, topologyId, componentId,
+                taskId, workerPort, streamId);
+        metricRegistry.gauge(metricName + ".m1_rate", this, topologyId, componentId, streamId,
+                taskId, workerPort);
+        this.timeSpanInSeconds = Math.max(60 - (60 % metricRegistry.getRateCounterUpdateIntervalSeconds()),
+                metricRegistry.getRateCounterUpdateIntervalSeconds());
+        this.values = new long[this.timeSpanInSeconds / metricRegistry.getRateCounterUpdateIntervalSeconds() + 1];
+
+    }
+
+    /**
+     * Reports the the average rate of events per second over 1 minute for the metric.
+     * @return the rate
+     */
+    @Override
+    public Double getValue() {
+        return currentRate;
+    }
+
+    public void inc(long n) {
+        counter.inc(n);
+    }
+
+    /**
+     * Updates the rate in a background thread by a StormMetricRegistry at a fixed frequency.
+     */
+    void update() {
+        time = (time + 1) % values.length;
+        values[time] = counter.getCount();
+        currentRate =  ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
+    }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 5bd6b92..6863503 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -27,9 +27,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
 import org.apache.storm.metrics2.reporters.StormReporter;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.task.WorkerTopologyContext;
@@ -42,6 +44,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class);
     private static final String WORKER_METRIC_PREFIX = "storm.worker.";
     private static final String TOPOLOGY_METRIC_PREFIX = "storm.topology.";
+    private static final int RATE_COUNTER_UPDATE_INTERVAL_SECONDS = 2;
     
     private final MetricRegistry registry = new MetricRegistry();
     private final List<StormReporter> reporters = new ArrayList<>();
@@ -54,6 +57,16 @@
     private String hostName = null;
     private int port = -1;
     private String topologyId = null;
+    private StormTimer metricTimer;
+    private Set<RateCounter> rateCounters = ConcurrentHashMap.newKeySet();
+
+    public RateCounter rateCounter(String metricName, String topologyId,
+                                   String componentId, int taskId, int workerPort, String streamId) {
+        RateCounter rateCounter = new RateCounter(this, metricName, topologyId, componentId, taskId,
+                workerPort, streamId);
+        rateCounters.add(rateCounter);
+        return rateCounter;
+    }
 
     public <T> SimpleGauge<T> gauge(
         T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
@@ -260,6 +273,13 @@
         this.topologyId = (String) topoConf.get(Config.STORM_ID);
         this.port = port;
 
+        this.metricTimer = new StormTimer("MetricRegistryTimer", (thread, exception) -> {
+            LOG.error("Error when processing metric event", exception);
+            Utils.exitProcess(20, "Error when processing metric event");
+        });
+        this.metricTimer.scheduleRecurring(RATE_COUNTER_UPDATE_INTERVAL_SECONDS,
+                RATE_COUNTER_UPDATE_INTERVAL_SECONDS, new RateCounterUpdater());
+
         LOG.info("Starting metrics reporters...");
         List<Map<String, Object>> reporterList = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_METRICS_REPORTERS);
         
@@ -286,6 +306,15 @@
         for (StormReporter sr : reporters) {
             sr.stop();
         }
+        try {
+            metricTimer.close();
+        } catch (InterruptedException e) {
+            LOG.warn("Exception while stopping", e);
+        }
+    }
+
+    public int getRateCounterUpdateIntervalSeconds() {
+        return RATE_COUNTER_UPDATE_INTERVAL_SECONDS;
     }
 
     public MetricRegistry getRegistry() {
@@ -368,7 +397,7 @@
     private String dotToUnderScore(String str) {
         return str.replace('.', '_');
     }
-
+    
     private static class MetricNames {
         private String longName;
         private String shortName;
@@ -394,4 +423,13 @@
             return shortName;
         }
     }
+
+    private class RateCounterUpdater implements Runnable {
+        @Override
+        public void run() {
+            for (RateCounter rateCounter : rateCounters) {
+                rateCounter.update();
+            }
+        }
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index 43903fe..0ac3a5e 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -31,7 +31,7 @@
     private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
     private static final String METRIC_NAME_CAPACITY = "__capacity";
 
-    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, RateCounter> rateCounters = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
@@ -61,8 +61,8 @@
 
     public void spoutAckedTuple(String streamId, long latencyMs) {
         String metricName = METRIC_NAME_ACKED + "-" + streamId;
-        Counter c = this.getCounter(metricName, streamId);
-        c.inc(this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, streamId);
+        rc.inc(this.samplingRate);
 
         metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
         RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, streamId);
@@ -72,8 +72,8 @@
     public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
         String key = sourceComponentId + ":" + sourceStreamId;
         String metricName = METRIC_NAME_ACKED + "-" + key;
-        Counter c = this.getCounter(metricName, sourceStreamId);
-        c.inc(this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, sourceStreamId);
+        rc.inc(this.samplingRate);
 
         metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
         RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
@@ -83,55 +83,55 @@
     public void spoutFailedTuple(String streamId) {
         String key = streamId;
         String metricName = METRIC_NAME_FAILED + "-" + key;
-        Counter c = this.getCounter(metricName, streamId);
-        c.inc(this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, streamId);
+        rc.inc(this.samplingRate);
     }
 
     public void boltFailedTuple(String sourceComponentId, String sourceStreamId) {
         String key = sourceComponentId + ":" + sourceStreamId;
         String metricName = METRIC_NAME_FAILED + "-" + key;
-        Counter c = this.getCounter(metricName, sourceStreamId);
-        c.inc(this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, sourceStreamId);
+        rc.inc(this.samplingRate);
     }
 
     public void emittedTuple(String streamId) {
         String key = streamId;
         String metricName = METRIC_NAME_EMITTED + "-" + key;
-        Counter c = this.getCounter(metricName, streamId);
-        c.inc(this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, streamId);
+        rc.inc(this.samplingRate);
     }
 
     public void transferredTuples(String streamId, int amount) {
         String key = streamId;
         String metricName = METRIC_NAME_TRANSFERRED + "-" + key;
-        Counter c = this.getCounter(metricName, streamId);
-        c.inc(amount * this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, streamId);
+        rc.inc(amount * this.samplingRate);
     }
 
     public void boltExecuteTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
         String key = sourceComponentId + ":" + sourceStreamId;
         String metricName = METRIC_NAME_EXECUTED + "-" + key;
-        Counter c = this.getCounter(metricName, sourceStreamId);
-        c.inc(this.samplingRate);
+        RateCounter rc = this.getRateCounter(metricName, sourceStreamId);
+        rc.inc(this.samplingRate);
 
         metricName = METRIC_NAME_EXECUTE_LATENCY + "-" + key;
         RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
         gauge.addValue(latencyMs);
     }
 
-    private Counter getCounter(String metricName, String streamId) {
-        Counter c = this.counters.get(metricName);
-        if (c == null) {
+    private RateCounter getRateCounter(String metricName, String streamId) {
+        RateCounter rc = this.rateCounters.get(metricName);
+        if (rc == null) {
             synchronized (this) {
-                c = this.counters.get(metricName);
-                if (c == null) {
-                    c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+                rc = this.rateCounters.get(metricName);
+                if (rc == null) {
+                    rc = metricRegistry.rateCounter(metricName, this.topologyId, this.componentId,
                             this.taskId, this.workerPort, streamId);
-                    this.counters.put(metricName, c);
+                    this.rateCounters.put(metricName, rc);
                 }
             }
         }
-        return c;
+        return rc;
     }
 
     private RollingAverageGauge getRollingAverageGauge(String metricName, String streamId) {