Merge pull request #3170 from agresch/agresch_storm_3541

STORM-3541 report v2 metrics api in metrics tick
diff --git a/docs/metrics_v2.md b/docs/metrics_v2.md
index e89e4e3..e05ee96 100644
--- a/docs/metrics_v2.md
+++ b/docs/metrics_v2.md
@@ -145,3 +145,5 @@
 }
 ```
 
+V2 metrics can also be reported to the Metrics Consumers registered with topology.metrics.consumer.register by enabling the topology.enable.v2.metrics.tick configuration.
+
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 4bcb2e3..7a1f689 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -263,6 +263,13 @@
      */
     @IsPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT = "topology.metrics.consumer.cpu.pcore.percent";
+
+    /**
+     * This config allows a topology to report metrics data points from the V2 metrics API through the metrics tick.
+     */
+    @IsBoolean
+    public static final String TOPOLOGY_ENABLE_V2_METRICS_TICK = "topology.enable.v2.metrics.tick";
+
     /**
      * The class name of the {@link org.apache.storm.state.StateProvider} implementation. If not specified defaults to {@link
      * org.apache.storm.state.InMemoryKeyValueStateProvider}. This can be overridden at the component level.
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 7e687ad..b404a2d 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -12,6 +12,12 @@
 
 package org.apache.storm.executor;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.UnknownHostException;
@@ -25,6 +31,7 @@
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
@@ -56,6 +63,7 @@
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -116,6 +124,7 @@
     protected ArrayList<Task> idToTask;
     protected int idToTaskBase;
     protected String hostname;
+    private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
 
     protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
         this.workerData = workerData;
@@ -297,11 +306,8 @@
             if (taskToMetricToRegistry != null) {
                 nameToRegistry = taskToMetricToRegistry.get(taskId);
             }
+            List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
             if (nameToRegistry != null) {
-                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
-                    hostname, workerTopologyContext.getThisWorkerPort(),
-                    componentId, taskId, Time.currentTimeSecs(), interval);
-                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
                 for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                     IMetric metric = entry.getValue();
                     Object value = metric.getValueAndReset();
@@ -310,17 +316,95 @@
                         dataPoints.add(dataPoint);
                     }
                 }
-                if (!dataPoints.isEmpty()) {
-                    task.sendUnanchored(Constants.METRICS_STREAM_ID,
-                                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
-                    executorTransfer.flush();
-                }
+            }
+            addV2Metrics(dataPoints);
+
+            if (!dataPoints.isEmpty()) {
+                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+                        hostname, workerTopologyContext.getThisWorkerPort(),
+                        componentId, taskId, Time.currentTimeSecs(), interval);
+                task.sendUnanchored(Constants.METRICS_STREAM_ID,
+                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
+                executorTransfer.flush();
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
         }
     }
 
+    // updates v1 metric dataPoints with v2 metric API data
+    private void addV2Metrics(List<IMetricsConsumer.DataPoint> dataPoints) {
+        boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
+        if (!enableV2MetricsDataPoints) {
+            return;
+        }
+        StormMetricRegistry stormMetricRegistry = workerData.getMetricRegistry();
+        for (Map.Entry<String, Gauge> entry : stormMetricRegistry.registry().getGauges().entrySet()) {
+            String name = entry.getKey();
+            Object v = entry.getValue().getValue();
+            if (v instanceof Number) {
+                IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(name, v);
+                dataPoints.add(dataPoint);
+            } else {
+                LOG.warn("Cannot report {}, its value is not a Number {}", name, v);
+            }
+        }
+        for (Map.Entry<String, Counter> entry : stormMetricRegistry.registry().getCounters().entrySet()) {
+            Object value = entry.getValue().getCount();
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+            dataPoints.add(dataPoint);
+        }
+        for (Map.Entry<String, Histogram> entry: stormMetricRegistry.registry().getHistograms().entrySet()) {
+            String baseName = entry.getKey();
+            Histogram histogram = entry.getValue();
+            Snapshot snapshot =  histogram.getSnapshot();
+            addSnapshotDatapoints(baseName, snapshot, dataPoints);
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", histogram.getCount());
+            dataPoints.add(dataPoint);
+        }
+        for (Map.Entry<String, Meter> entry: stormMetricRegistry.registry().getMeters().entrySet()) {
+            String baseName = entry.getKey();
+            Meter meter = entry.getValue();
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", meter.getCount());
+            dataPoints.add(dataPoint);
+            addConvertedMetric(baseName, ".m1_rate", meter.getOneMinuteRate(), dataPoints);
+            addConvertedMetric(baseName, ".m5_rate", meter.getFiveMinuteRate(), dataPoints);
+            addConvertedMetric(baseName, ".m15_rate", meter.getFifteenMinuteRate(), dataPoints);
+            addConvertedMetric(baseName, ".mean_rate", meter.getMeanRate(), dataPoints);
+        }
+        for (Map.Entry<String, Timer> entry : stormMetricRegistry.registry().getTimers().entrySet()) {
+            String baseName = entry.getKey();
+            Timer timer = entry.getValue();
+            Snapshot snapshot =  timer.getSnapshot();
+            addSnapshotDatapoints(baseName, snapshot, dataPoints);
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", timer.getCount());
+            dataPoints.add(dataPoint);
+        }
+    }
+
+    private void addSnapshotDatapoints(String baseName, Snapshot snapshot, List<IMetricsConsumer.DataPoint> dataPoints) {
+        addConvertedMetric(baseName, ".max", snapshot.getMax(), dataPoints);
+        addConvertedMetric(baseName, ".mean", snapshot.getMean(), dataPoints);
+        addConvertedMetric(baseName, ".min", snapshot.getMin(), dataPoints);
+        addConvertedMetric(baseName, ".stddev", snapshot.getStdDev(), dataPoints);
+        addConvertedMetric(baseName, ".p50", snapshot.getMedian(), dataPoints);
+        addConvertedMetric(baseName, ".p75", snapshot.get75thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p95", snapshot.get95thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p98", snapshot.get98thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p99", snapshot.get99thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p999", snapshot.get999thPercentile(), dataPoints);
+    }
+
+    private void addConvertedMetric(String baseName, String suffix, double value, List<IMetricsConsumer.DataPoint> dataPoints) {
+        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + suffix, convertDuration(value));
+        dataPoints.add(dataPoint);
+    }
+
+    // converts timed codahale metric values from nanosecond to millisecond time scale
+    private double convertDuration(double duration) {
+        return duration * msDurationFactor;
+    }
+
     protected void setupMetrics() {
         for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
             StormTimer timerTask = workerData.getUserTimer();
@@ -559,5 +643,4 @@
     public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
         this.executorTransfer = executorTransfer;
     }
-
 }