Merge pull request #3639 from agresch/agresch_worker_cpu_metric

STORM-4054 -Add Worker CPU Metric
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 6137f09..4a2db27 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -303,13 +303,11 @@
 
 * There are metrics prefixed with `threads` providing the number of threads, daemon threads, blocked and deadlocked threads.
 
-##### Uptime
-
-* `uptimeSecs` reports the number of seconds the worker has been up for
-* `newWorkerEvent` is 1 when a worker is first started and 0 all other times.  This can be used to tell when a worker has crashed and is restarted.
-* `startTimeSecs` is when the worker started in seconds since the epoch
-
-##### doHeartbeat-calls
+##### Other worker metrics
 
 * `doHeartbeat-calls` is a meter that indicates the rate the worker is performing heartbeats.
+* `newWorkerEvent` is 1 when a worker is first started and 0 all other times.  This can be used to tell when a worker has crashed and is restarted.
+* `startTimeSecs` is when the worker started in seconds since the epoch
+* `uptimeSecs` reports the number of seconds the worker has been up for
+* `workerCpuUsage` reports the cpu usage of the worker as a percentage of cores.  1.0 indicates 1 cpu core.
 
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index fd810c2..e6bc50e 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -18,8 +18,11 @@
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metrics2.PerReporterGauge;
@@ -35,6 +38,8 @@
 // TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
 // This bolt was conceived to export worker stats via metrics api.
 public class SystemBolt implements IBolt {
+    private static final long MIN_CPU_CALCULATION_THRESHOLD_NSEC =
+            TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
     private static boolean prepareWasCalled = false;
 
     @SuppressWarnings({ "unchecked" })
@@ -67,12 +72,56 @@
 
 
         context.registerGauge("newWorkerEvent", new NewWorkerGauge());
+        context.registerGauge("workerCpuUsage", new WorkerCpuMetric());
 
         int bucketSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
         registerMetrics(context, (Map<String, String>) topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
         registerMetrics(context, (Map<String, String>) topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
     }
 
+    private class WorkerCpuMetric implements Gauge<Double> {
+        private long lastCalculationTimeNsec;
+        private long previousCpuTotal;
+        private double cpuUsage;
+
+        WorkerCpuMetric() {
+            lastCalculationTimeNsec = System.nanoTime();
+            previousCpuTotal = getTotalCpuUsage();
+            cpuUsage = 0.0d;
+        }
+
+        private long getTotalCpuUsage() {
+            long totalCpuNsecs = 0L;
+            ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+            for (Long threadId : threadMxBean.getAllThreadIds()) {
+                long threadCpu = threadMxBean.getThreadCpuTime(threadId);
+                if (threadCpu > 0L) {
+                    totalCpuNsecs += threadCpu;
+                }
+            }
+            return totalCpuNsecs;
+        }
+
+        private void updateCalculation() {
+            // we could have multiple reporters calling getValue() one right
+            // after another, with inaccurate reporting due to the small time difference.
+            long elapsed = System.nanoTime() - this.lastCalculationTimeNsec;
+            if (elapsed >= MIN_CPU_CALCULATION_THRESHOLD_NSEC) {
+                long cpuUsage = getTotalCpuUsage();
+                long usageDuringPeriod = cpuUsage - previousCpuTotal;
+                this.cpuUsage = (double) usageDuringPeriod / (double) elapsed;
+                this.lastCalculationTimeNsec = System.nanoTime();
+                this.previousCpuTotal = cpuUsage;
+            }
+        }
+
+        @Override
+        public Double getValue() {
+            updateCalculation();
+            return this.cpuUsage;
+        }
+    }
+
     // newWorkerEvent: 1 when a worker is first started and 0 all other times.
     // This can be used to tell when a worker has crashed and is restarted.
     private class NewWorkerMetric {