STORM-3541 report v2 metrics api in metrics tick
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 2f2d53b..7fa892f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -232,7 +232,7 @@
 
     private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
         Map<String, Object> conf = workerData.getConf();
-        return new TopologyContext(
+        TopologyContext topologyContext = new TopologyContext(
             topology,
             workerData.getTopologyConf(),
             workerData.getTaskToComponent(),
@@ -252,6 +252,8 @@
             executor.getIntervalToTaskToMetricToRegistry(),
             executor.getOpenOrPrepareWasCalled(),
             workerData.getMetricRegistry());
+        executor.setMetricRegistry(workerData.getMetricRegistry());
+        return topologyContext;
     }
 
     private Object mkTaskObject() {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 7e687ad..7fe3b44 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -12,6 +12,12 @@
 
 package org.apache.storm.executor;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.UnknownHostException;
@@ -25,6 +31,7 @@
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
@@ -56,6 +63,7 @@
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -116,6 +124,8 @@
     protected ArrayList<Task> idToTask;
     protected int idToTaskBase;
     protected String hostname;
+    private StormMetricRegistry stormMetricRegistry;
+    private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
 
     protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
         this.workerData = workerData;
@@ -297,11 +307,8 @@
             if (taskToMetricToRegistry != null) {
                 nameToRegistry = taskToMetricToRegistry.get(taskId);
             }
+            List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
             if (nameToRegistry != null) {
-                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
-                    hostname, workerTopologyContext.getThisWorkerPort(),
-                    componentId, taskId, Time.currentTimeSecs(), interval);
-                List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
                 for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
                     IMetric metric = entry.getValue();
                     Object value = metric.getValueAndReset();
@@ -310,17 +317,93 @@
                         dataPoints.add(dataPoint);
                     }
                 }
-                if (!dataPoints.isEmpty()) {
-                    task.sendUnanchored(Constants.METRICS_STREAM_ID,
-                                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
-                    executorTransfer.flush();
-                }
+            }
+            addV2Metrics(dataPoints);
+
+            if (!dataPoints.isEmpty()) {
+                IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+                        hostname, workerTopologyContext.getThisWorkerPort(),
+                        componentId, taskId, Time.currentTimeSecs(), interval);
+                task.sendUnanchored(Constants.METRICS_STREAM_ID,
+                        new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
+                executorTransfer.flush();
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
         }
     }
 
+    // updates v1 metric dataPoints with v2 metric API data
+    private void addV2Metrics(List<IMetricsConsumer.DataPoint> dataPoints) {
+        if (stormMetricRegistry == null) {
+            return;
+        }
+        for (Map.Entry<String, Gauge> entry : stormMetricRegistry.registry().getGauges().entrySet()) {
+            String name = entry.getKey();
+            Object v = entry.getValue().getValue();
+            if (v instanceof Number) {
+                IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(name, v);
+                dataPoints.add(dataPoint);
+            } else {
+                LOG.warn("Cannot report {}, it's value is not a Number {}", name, v);
+            }
+        }
+        for (Map.Entry<String, Counter> entry : stormMetricRegistry.registry().getCounters().entrySet()) {
+            Object value = entry.getValue().getCount();
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+            dataPoints.add(dataPoint);
+        }
+        for (Map.Entry<String, Histogram> entry: stormMetricRegistry.registry().getHistograms().entrySet()) {
+            String baseName = entry.getKey();
+            Histogram histogram = entry.getValue();
+            Snapshot snapshot =  histogram.getSnapshot();
+            addSnapshotDatapoints(baseName, snapshot, dataPoints);
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", histogram.getCount());
+            dataPoints.add(dataPoint);
+        }
+        for (Map.Entry<String, Meter> entry: stormMetricRegistry.registry().getMeters().entrySet()) {
+            String baseName = entry.getKey();
+            Meter meter = entry.getValue();
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", meter.getCount());
+            dataPoints.add(dataPoint);
+            addConvertedMetric(baseName, ".m1_rate", meter.getOneMinuteRate(), dataPoints);
+            addConvertedMetric(baseName, ".m5_rate", meter.getFiveMinuteRate(), dataPoints);
+            addConvertedMetric(baseName, ".m15_rate", meter.getFifteenMinuteRate(), dataPoints);
+            addConvertedMetric(baseName, ".mean_rate", meter.getMeanRate(), dataPoints);
+        }
+        for (Map.Entry<String, Timer> entry : stormMetricRegistry.registry().getTimers().entrySet()) {
+            String baseName = entry.getKey();
+            Timer timer = entry.getValue();
+            Snapshot snapshot =  timer.getSnapshot();
+            addSnapshotDatapoints(baseName, snapshot, dataPoints);
+            IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", timer.getCount());
+            dataPoints.add(dataPoint);
+        }
+    }
+
+    private void addSnapshotDatapoints(String baseName, Snapshot snapshot, List<IMetricsConsumer.DataPoint> dataPoints) {
+        addConvertedMetric(baseName, ".max", snapshot.getMax(), dataPoints);
+        addConvertedMetric(baseName, ".mean", snapshot.getMean(), dataPoints);
+        addConvertedMetric(baseName, ".min", snapshot.getMin(), dataPoints);
+        addConvertedMetric(baseName, ".stddev", snapshot.getStdDev(), dataPoints);
+        addConvertedMetric(baseName, ".p50", snapshot.getMedian(), dataPoints);
+        addConvertedMetric(baseName, ".p75", snapshot.get75thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p95", snapshot.get95thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p98", snapshot.get98thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p99", snapshot.get99thPercentile(), dataPoints);
+        addConvertedMetric(baseName, ".p999", snapshot.get999thPercentile(), dataPoints);
+    }
+
+    private void addConvertedMetric(String baseName, String suffix, double value, List<IMetricsConsumer.DataPoint> dataPoints) {
+        IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + suffix, convertDuration(value));
+        dataPoints.add(dataPoint);
+    }
+
+    // converts timed codahale metric values from nanosecond to millisecond time scale
+    private double convertDuration(double duration) {
+        return duration * msDurationFactor;
+    }
+
     protected void setupMetrics() {
         for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
             StormTimer timerTask = workerData.getUserTimer();
@@ -560,4 +643,8 @@
         this.executorTransfer = executorTransfer;
     }
 
+    public void setMetricRegistry(StormMetricRegistry stormMetricRegistry) {
+        this.stormMetricRegistry = stormMetricRegistry;
+    }
+
 }