Merge pull request #3170 from agresch/agresch_storm_3541
STORM-3541 report v2 metrics api in metrics tick
diff --git a/docs/metrics_v2.md b/docs/metrics_v2.md
index e89e4e3..e05ee96 100644
--- a/docs/metrics_v2.md
+++ b/docs/metrics_v2.md
@@ -145,3 +145,5 @@
}
```
+V2 metrics can also be reported to the Metrics Consumers registered with topology.metrics.consumer.register by enabling the topology.enable.v2.metrics.tick configuration.
+
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 4bcb2e3..7a1f689 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -263,6 +263,13 @@
*/
@IsPositiveNumber(includeZero = true)
public static final String TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT = "topology.metrics.consumer.cpu.pcore.percent";
+
+ /**
+ * This config allows a topology to report metrics data points from the V2 metrics API through the metrics tick.
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_ENABLE_V2_METRICS_TICK = "topology.enable.v2.metrics.tick";
+
/**
* The class name of the {@link org.apache.storm.state.StateProvider} implementation. If not specified defaults to {@link
* org.apache.storm.state.InMemoryKeyValueStateProvider}. This can be overridden at the component level.
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 7e687ad..b404a2d 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -12,6 +12,12 @@
package org.apache.storm.executor;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.UnknownHostException;
@@ -25,6 +31,7 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
@@ -56,6 +63,7 @@
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -116,6 +124,7 @@
protected ArrayList<Task> idToTask;
protected int idToTaskBase;
protected String hostname;
+ private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
this.workerData = workerData;
@@ -297,11 +306,8 @@
if (taskToMetricToRegistry != null) {
nameToRegistry = taskToMetricToRegistry.get(taskId);
}
+ List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
if (nameToRegistry != null) {
- IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
- hostname, workerTopologyContext.getThisWorkerPort(),
- componentId, taskId, Time.currentTimeSecs(), interval);
- List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();
for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {
IMetric metric = entry.getValue();
Object value = metric.getValueAndReset();
@@ -310,17 +316,95 @@
dataPoints.add(dataPoint);
}
}
- if (!dataPoints.isEmpty()) {
- task.sendUnanchored(Constants.METRICS_STREAM_ID,
- new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
- executorTransfer.flush();
- }
+ }
+ addV2Metrics(dataPoints);
+
+ if (!dataPoints.isEmpty()) {
+ IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
+ hostname, workerTopologyContext.getThisWorkerPort(),
+ componentId, taskId, Time.currentTimeSecs(), interval);
+ task.sendUnanchored(Constants.METRICS_STREAM_ID,
+ new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);
+ executorTransfer.flush();
}
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
+ // updates v1 metric dataPoints with v2 metric API data
+ private void addV2Metrics(List<IMetricsConsumer.DataPoint> dataPoints) {
+ boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
+ if (!enableV2MetricsDataPoints) {
+ return;
+ }
+ StormMetricRegistry stormMetricRegistry = workerData.getMetricRegistry();
+ for (Map.Entry<String, Gauge> entry : stormMetricRegistry.registry().getGauges().entrySet()) {
+ String name = entry.getKey();
+ Object v = entry.getValue().getValue();
+ if (v instanceof Number) {
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(name, v);
+ dataPoints.add(dataPoint);
+ } else {
+ LOG.warn("Cannot report {}, its value is not a Number {}", name, v);
+ }
+ }
+ for (Map.Entry<String, Counter> entry : stormMetricRegistry.registry().getCounters().entrySet()) {
+ Object value = entry.getValue().getCount();
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);
+ dataPoints.add(dataPoint);
+ }
+ for (Map.Entry<String, Histogram> entry: stormMetricRegistry.registry().getHistograms().entrySet()) {
+ String baseName = entry.getKey();
+ Histogram histogram = entry.getValue();
+ Snapshot snapshot = histogram.getSnapshot();
+ addSnapshotDatapoints(baseName, snapshot, dataPoints);
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", histogram.getCount());
+ dataPoints.add(dataPoint);
+ }
+ for (Map.Entry<String, Meter> entry: stormMetricRegistry.registry().getMeters().entrySet()) {
+ String baseName = entry.getKey();
+ Meter meter = entry.getValue();
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", meter.getCount());
+ dataPoints.add(dataPoint);
+ addConvertedMetric(baseName, ".m1_rate", meter.getOneMinuteRate(), dataPoints);
+ addConvertedMetric(baseName, ".m5_rate", meter.getFiveMinuteRate(), dataPoints);
+ addConvertedMetric(baseName, ".m15_rate", meter.getFifteenMinuteRate(), dataPoints);
+ addConvertedMetric(baseName, ".mean_rate", meter.getMeanRate(), dataPoints);
+ }
+ for (Map.Entry<String, Timer> entry : stormMetricRegistry.registry().getTimers().entrySet()) {
+ String baseName = entry.getKey();
+ Timer timer = entry.getValue();
+ Snapshot snapshot = timer.getSnapshot();
+ addSnapshotDatapoints(baseName, snapshot, dataPoints);
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + ".count", timer.getCount());
+ dataPoints.add(dataPoint);
+ }
+ }
+
+ private void addSnapshotDatapoints(String baseName, Snapshot snapshot, List<IMetricsConsumer.DataPoint> dataPoints) {
+ addConvertedMetric(baseName, ".max", snapshot.getMax(), dataPoints);
+ addConvertedMetric(baseName, ".mean", snapshot.getMean(), dataPoints);
+ addConvertedMetric(baseName, ".min", snapshot.getMin(), dataPoints);
+ addConvertedMetric(baseName, ".stddev", snapshot.getStdDev(), dataPoints);
+ addConvertedMetric(baseName, ".p50", snapshot.getMedian(), dataPoints);
+ addConvertedMetric(baseName, ".p75", snapshot.get75thPercentile(), dataPoints);
+ addConvertedMetric(baseName, ".p95", snapshot.get95thPercentile(), dataPoints);
+ addConvertedMetric(baseName, ".p98", snapshot.get98thPercentile(), dataPoints);
+ addConvertedMetric(baseName, ".p99", snapshot.get99thPercentile(), dataPoints);
+ addConvertedMetric(baseName, ".p999", snapshot.get999thPercentile(), dataPoints);
+ }
+
+ private void addConvertedMetric(String baseName, String suffix, double value, List<IMetricsConsumer.DataPoint> dataPoints) {
+ IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(baseName + suffix, convertDuration(value));
+ dataPoints.add(dataPoint);
+ }
+
+ // converts timed codahale metric values from nanosecond to millisecond time scale
+ private double convertDuration(double duration) {
+ return duration * msDurationFactor;
+ }
+
protected void setupMetrics() {
for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
StormTimer timerTask = workerData.getUserTimer();
@@ -559,5 +643,4 @@
public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
this.executorTransfer = executorTransfer;
}
-
}