STORM-3714 add rate tracking to TaskMetrics (#3350)
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
new file mode 100644
index 0000000..77b6720
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RateCounter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+
+/**
+ * A Counter metric that also implements a Gauge to report the average rate of events per second over 1 minute. This class
+ * was added as a compromise to using a Meter, which has a much larger performance impact.
+ */
+public class RateCounter implements Gauge<Double> {
+ private Counter counter;
+ private double currentRate = 0;
+ private int time = 0;
+ private final long[] values;
+ private final int timeSpanInSeconds;
+
+ RateCounter(StormMetricRegistry metricRegistry, String metricName, String topologyId,
+ String componentId, int taskId, int workerPort, String streamId) {
+ counter = metricRegistry.counter(metricName, topologyId, componentId,
+ taskId, workerPort, streamId);
+ metricRegistry.gauge(metricName + ".m1_rate", this, topologyId, componentId, streamId,
+ taskId, workerPort);
+ this.timeSpanInSeconds = Math.max(60 - (60 % metricRegistry.getRateCounterUpdateIntervalSeconds()),
+ metricRegistry.getRateCounterUpdateIntervalSeconds());
+ this.values = new long[this.timeSpanInSeconds / metricRegistry.getRateCounterUpdateIntervalSeconds() + 1];
+
+ }
+
+ /**
+ * Reports the the average rate of events per second over 1 minute for the metric.
+ * @return the rate
+ */
+ @Override
+ public Double getValue() {
+ return currentRate;
+ }
+
+ public void inc(long n) {
+ counter.inc(n);
+ }
+
+ /**
+ * Updates the rate in a background thread by a StormMetricRegistry at a fixed frequency.
+ */
+ void update() {
+ time = (time + 1) % values.length;
+ values[time] = counter.getCount();
+ currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);
+ }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 5bd6b92..6863503 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -27,9 +27,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
import org.apache.storm.metrics2.reporters.StormReporter;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
@@ -42,6 +44,7 @@
private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class);
private static final String WORKER_METRIC_PREFIX = "storm.worker.";
private static final String TOPOLOGY_METRIC_PREFIX = "storm.topology.";
+ private static final int RATE_COUNTER_UPDATE_INTERVAL_SECONDS = 2;
private final MetricRegistry registry = new MetricRegistry();
private final List<StormReporter> reporters = new ArrayList<>();
@@ -54,6 +57,16 @@
private String hostName = null;
private int port = -1;
private String topologyId = null;
+ private StormTimer metricTimer;
+ private Set<RateCounter> rateCounters = ConcurrentHashMap.newKeySet();
+
+ public RateCounter rateCounter(String metricName, String topologyId,
+ String componentId, int taskId, int workerPort, String streamId) {
+ RateCounter rateCounter = new RateCounter(this, metricName, topologyId, componentId, taskId,
+ workerPort, streamId);
+ rateCounters.add(rateCounter);
+ return rateCounter;
+ }
public <T> SimpleGauge<T> gauge(
T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
@@ -260,6 +273,13 @@
this.topologyId = (String) topoConf.get(Config.STORM_ID);
this.port = port;
+ this.metricTimer = new StormTimer("MetricRegistryTimer", (thread, exception) -> {
+ LOG.error("Error when processing metric event", exception);
+ Utils.exitProcess(20, "Error when processing metric event");
+ });
+ this.metricTimer.scheduleRecurring(RATE_COUNTER_UPDATE_INTERVAL_SECONDS,
+ RATE_COUNTER_UPDATE_INTERVAL_SECONDS, new RateCounterUpdater());
+
LOG.info("Starting metrics reporters...");
List<Map<String, Object>> reporterList = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_METRICS_REPORTERS);
@@ -286,6 +306,15 @@
for (StormReporter sr : reporters) {
sr.stop();
}
+ try {
+ metricTimer.close();
+ } catch (InterruptedException e) {
+ LOG.warn("Exception while stopping", e);
+ }
+ }
+
+ public int getRateCounterUpdateIntervalSeconds() {
+ return RATE_COUNTER_UPDATE_INTERVAL_SECONDS;
}
public MetricRegistry getRegistry() {
@@ -368,7 +397,7 @@
private String dotToUnderScore(String str) {
return str.replace('.', '_');
}
-
+
private static class MetricNames {
private String longName;
private String shortName;
@@ -394,4 +423,13 @@
return shortName;
}
}
+
+ private class RateCounterUpdater implements Runnable {
+ @Override
+ public void run() {
+ for (RateCounter rateCounter : rateCounters) {
+ rateCounter.update();
+ }
+ }
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index 43903fe..0ac3a5e 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -31,7 +31,7 @@
private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
private static final String METRIC_NAME_CAPACITY = "__capacity";
- private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, RateCounter> rateCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
private final String topologyId;
@@ -61,8 +61,8 @@
public void spoutAckedTuple(String streamId, long latencyMs) {
String metricName = METRIC_NAME_ACKED + "-" + streamId;
- Counter c = this.getCounter(metricName, streamId);
- c.inc(this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, streamId);
+ rc.inc(this.samplingRate);
metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, streamId);
@@ -72,8 +72,8 @@
public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
String key = sourceComponentId + ":" + sourceStreamId;
String metricName = METRIC_NAME_ACKED + "-" + key;
- Counter c = this.getCounter(metricName, sourceStreamId);
- c.inc(this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, sourceStreamId);
+ rc.inc(this.samplingRate);
metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
@@ -83,55 +83,55 @@
public void spoutFailedTuple(String streamId) {
String key = streamId;
String metricName = METRIC_NAME_FAILED + "-" + key;
- Counter c = this.getCounter(metricName, streamId);
- c.inc(this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, streamId);
+ rc.inc(this.samplingRate);
}
public void boltFailedTuple(String sourceComponentId, String sourceStreamId) {
String key = sourceComponentId + ":" + sourceStreamId;
String metricName = METRIC_NAME_FAILED + "-" + key;
- Counter c = this.getCounter(metricName, sourceStreamId);
- c.inc(this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, sourceStreamId);
+ rc.inc(this.samplingRate);
}
public void emittedTuple(String streamId) {
String key = streamId;
String metricName = METRIC_NAME_EMITTED + "-" + key;
- Counter c = this.getCounter(metricName, streamId);
- c.inc(this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, streamId);
+ rc.inc(this.samplingRate);
}
public void transferredTuples(String streamId, int amount) {
String key = streamId;
String metricName = METRIC_NAME_TRANSFERRED + "-" + key;
- Counter c = this.getCounter(metricName, streamId);
- c.inc(amount * this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, streamId);
+ rc.inc(amount * this.samplingRate);
}
public void boltExecuteTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
String key = sourceComponentId + ":" + sourceStreamId;
String metricName = METRIC_NAME_EXECUTED + "-" + key;
- Counter c = this.getCounter(metricName, sourceStreamId);
- c.inc(this.samplingRate);
+ RateCounter rc = this.getRateCounter(metricName, sourceStreamId);
+ rc.inc(this.samplingRate);
metricName = METRIC_NAME_EXECUTE_LATENCY + "-" + key;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
gauge.addValue(latencyMs);
}
- private Counter getCounter(String metricName, String streamId) {
- Counter c = this.counters.get(metricName);
- if (c == null) {
+ private RateCounter getRateCounter(String metricName, String streamId) {
+ RateCounter rc = this.rateCounters.get(metricName);
+ if (rc == null) {
synchronized (this) {
- c = this.counters.get(metricName);
- if (c == null) {
- c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+ rc = this.rateCounters.get(metricName);
+ if (rc == null) {
+ rc = metricRegistry.rateCounter(metricName, this.topologyId, this.componentId,
this.taskId, this.workerPort, streamId);
- this.counters.put(metricName, c);
+ this.rateCounters.put(metricName, rc);
}
}
}
- return c;
+ return rc;
}
private RollingAverageGauge getRollingAverageGauge(String metricName, String streamId) {