STORM-3697 add capacity metric (#3333)
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index bc9d6b0..ddd830d 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -227,9 +227,11 @@
new BoltExecuteInfo(tuple, taskId, delta).applyOn(topologyContext);
}
if (delta >= 0) {
- stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
- Task task = idToTask.get(taskId - idToTaskBase);
- task.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+ Task firstTask = idToTask.get(taskIds.get(0) - idToTaskBase);
+ stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta,
+ workerData.getUptime().upTime(), firstTask);
+ Task currentTask = idToTask.get(taskId - idToTaskBase);
+ currentTask.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
}
}
}
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
index 25fa270..8bf18d5 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
@@ -21,6 +21,8 @@
* Acts as a MultiCount Stat, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
*/
public class MultiCountStat<T> {
+ public static final int TEN_MIN_IN_SECONDS = 60 * 10;
+ public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + "";
private final int numBuckets;
private ConcurrentHashMap<T, CountStat> counts = new ConcurrentHashMap<>();
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
index 650fdee..9a50bb9 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
@@ -15,18 +15,18 @@
import com.codahale.metrics.Gauge;
public class RollingAverageGauge implements Gauge<Double> {
- private long[] samples = new long[3];
+ private double[] samples = new double[3];
private int index = 0;
@Override
public Double getValue() {
synchronized (this) {
- long total = samples[0] + samples[1] + samples[2];
+ double total = samples[0] + samples[1] + samples[2];
return total / 3.0;
}
}
- public void addValue(long value) {
+ public void addValue(double value) {
synchronized (this) {
samples[index] = value;
index = (++index % 3);
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index d52190f..43903fe 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -18,6 +18,7 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
public class TaskMetrics {
private static final String METRIC_NAME_ACKED = "__ack-count";
@@ -28,6 +29,7 @@
private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
+ private static final String METRIC_NAME_CAPACITY = "__capacity";
private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
@@ -50,6 +52,13 @@
this.samplingRate = ConfigUtils.samplingRate(topoConf);
}
+ public void setCapacity(double capacity) {
+ String metricName = METRIC_NAME_CAPACITY;
+ // capacity is over all streams, will report using the default streamId
+ RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, Utils.DEFAULT_STREAM_ID);
+ gauge.addValue(capacity);
+ }
+
public void spoutAckedTuple(String streamId, long latencyMs) {
String metricName = METRIC_NAME_ACKED + "-" + streamId;
Counter c = this.getCounter(metricName, streamId);
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 030b0d7..26e3776 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -12,7 +12,12 @@
package org.apache.storm.stats;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.daemon.Task;
import org.apache.storm.generated.BoltStats;
import org.apache.storm.generated.ExecutorSpecificStats;
import org.apache.storm.generated.ExecutorStats;
@@ -53,10 +58,53 @@
super.cleanupStats();
}
- public void boltExecuteTuple(String component, String stream, long latencyMs) {
+ public void boltExecuteTuple(String component, String stream, long latencyMs, long workerUptimeSecs,
+ Task firstExecutorTask) {
List key = Lists.newArrayList(component, stream);
this.getExecuted().incBy(key, this.rate);
this.getExecuteLatencies().record(key, latencyMs);
+
+ // Calculate capacity: This is really for the whole executor, but we will use the executor's first task
+ // for reporting the metric.
+ double capacity = calculateCapacity(workerUptimeSecs);
+ firstExecutorTask.getTaskMetrics().setCapacity(capacity);
+ }
+
+ private double calculateCapacity(long workerUptimeSecs) {
+ if (workerUptimeSecs > 0) {
+ Map<String, Double> execAvg = valueStat(this.getExecuteLatencies()).get(MultiCountStat.TEN_MIN_IN_SECONDS_STR);
+ Map<String, Long> exec = valueStat(this.getExecuted()).get(MultiCountStat.TEN_MIN_IN_SECONDS_STR);
+
+ Set<Object> allKeys = new HashSet<>();
+ if (execAvg != null) {
+ allKeys.addAll(execAvg.keySet());
+ }
+ if (exec != null) {
+ allKeys.addAll(exec.keySet());
+ }
+
+ double totalAvg = 0;
+ for (Object k : allKeys) {
+ double avg = getOr0(execAvg, k).doubleValue();
+ long cnt = getOr0(exec, k).longValue();
+ totalAvg += avg * cnt;
+ }
+
+ return totalAvg / (Math.min(workerUptimeSecs, MultiCountStat.TEN_MIN_IN_SECONDS) * 1000);
+ }
+ return 0.0;
+ }
+
+ private static Number getOr0(Map m, Object k) {
+ if (m == null) {
+ return 0;
+ }
+
+ Number n = (Number) m.get(k);
+ if (n == null) {
+ return 0;
+ }
+ return n;
}
public void boltAckedTuple(String component, String stream, long latencyMs) {