Merge pull request #3639 from agresch/agresch_worker_cpu_metric
STORM-4054 -Add Worker CPU Metric
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 6137f09..4a2db27 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -303,13 +303,11 @@
* There are metrics prefixed with `threads` providing the number of threads, daemon threads, blocked and deadlocked threads.
-##### Uptime
-
-* `uptimeSecs` reports the number of seconds the worker has been up for
-* `newWorkerEvent` is 1 when a worker is first started and 0 all other times. This can be used to tell when a worker has crashed and is restarted.
-* `startTimeSecs` is when the worker started in seconds since the epoch
-
-##### doHeartbeat-calls
+##### Other worker metrics
* `doHeartbeat-calls` is a meter that indicates the rate the worker is performing heartbeats.
+* `newWorkerEvent` is 1 when a worker is first started and 0 all other times. This can be used to tell when a worker has crashed and is restarted.
+* `startTimeSecs` is when the worker started in seconds since the epoch
+* `uptimeSecs` reports the number of seconds the worker has been up for
+* `workerCpuUsage` reports the cpu usage of the worker as a percentage of cores. 1.0 indicates 1 cpu core.
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index fd810c2..e6bc50e 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -18,8 +18,11 @@
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metrics2.PerReporterGauge;
@@ -35,6 +38,8 @@
// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
// This bolt was conceived to export worker stats via metrics api.
public class SystemBolt implements IBolt {
+ private static final long MIN_CPU_CALCULATION_THRESHOLD_NSEC =
+ TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
private static boolean prepareWasCalled = false;
@SuppressWarnings({ "unchecked" })
@@ -67,12 +72,56 @@
context.registerGauge("newWorkerEvent", new NewWorkerGauge());
+ context.registerGauge("workerCpuUsage", new WorkerCpuMetric());
int bucketSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
registerMetrics(context, (Map<String, String>) topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
registerMetrics(context, (Map<String, String>) topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
}
+ private class WorkerCpuMetric implements Gauge<Double> {
+ private long lastCalculationTimeNsec;
+ private long previousCpuTotal;
+ private double cpuUsage;
+
+ WorkerCpuMetric() {
+ lastCalculationTimeNsec = System.nanoTime();
+ previousCpuTotal = getTotalCpuUsage();
+ cpuUsage = 0.0d;
+ }
+
+ private long getTotalCpuUsage() {
+ long totalCpuNsecs = 0L;
+ ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+ for (Long threadId : threadMxBean.getAllThreadIds()) {
+ long threadCpu = threadMxBean.getThreadCpuTime(threadId);
+ if (threadCpu > 0L) {
+ totalCpuNsecs += threadCpu;
+ }
+ }
+ return totalCpuNsecs;
+ }
+
+ private void updateCalculation() {
+ // we could have multiple reporters calling getValue() one right
+ // after another, with inaccurate reporting due to the small time difference.
+ long elapsed = System.nanoTime() - this.lastCalculationTimeNsec;
+ if (elapsed >= MIN_CPU_CALCULATION_THRESHOLD_NSEC) {
+ long cpuUsage = getTotalCpuUsage();
+ long usageDuringPeriod = cpuUsage - previousCpuTotal;
+ this.cpuUsage = (double) usageDuringPeriod / (double) elapsed;
+ this.lastCalculationTimeNsec = System.nanoTime();
+ this.previousCpuTotal = cpuUsage;
+ }
+ }
+
+ @Override
+ public Double getValue() {
+ updateCalculation();
+ return this.cpuUsage;
+ }
+ }
+
// newWorkerEvent: 1 when a worker is first started and 0 all other times.
// This can be used to tell when a worker has crashed and is restarted.
private class NewWorkerMetric {