import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.TopologyMetric;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
* A topology metric context contains all in-memory metric data of a topology.
* This class resides in TopologyMaster.
* @author Cody (
* @since 2.0.5
public class TopologyMetricContext {
private final Logger LOG = LoggerFactory.getLogger(getClass());
private final ReentrantLock lock = new ReentrantLock();
private Set<ResourceWorkerSlot> workerSet;
private int taskNum = 1;
private ConcurrentMap<String, MetricInfo> memCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> memMeta = new ConcurrentHashMap<>();
private final AtomicBoolean isMerging = new AtomicBoolean(false);
private String topologyId;
private volatile int flushedMetaNum = 0;
* sync meta from metric cache on startup
private volatile boolean syncMeta = false;
private Map conf;
public TopologyMetricContext() {
public TopologyMetricContext(Set<ResourceWorkerSlot> workerSet) {
this.workerSet = workerSet;
public TopologyMetricContext(String topologyId, Set<ResourceWorkerSlot> workerSet, Map conf) {
this.topologyId = topologyId;
this.conf = conf;
public ConcurrentMap<String, Long> getMemMeta() {
return memMeta;
public String getTopologyId() {
return topologyId;
public void setTopologyId(String topologyId) {
this.topologyId = topologyId;
public boolean syncMeta() {
return syncMeta;
public void setSyncMeta(boolean syncMeta) {
this.syncMeta = syncMeta;
public int getTaskNum() {
return taskNum;
public void setTaskNum(int taskNum) {
this.taskNum = taskNum;
public int getFlushedMetaNum() {
return flushedMetaNum;
public void setFlushedMetaNum(int flushedMetaNum) {
this.flushedMetaNum = flushedMetaNum;
public ReentrantLock getLock() {
return lock;
public int getWorkerNum() {
return workerSet.size();
public void setWorkerSet(Set<ResourceWorkerSlot> workerSet) {
this.workerSet = workerSet;
public void resetUploadedMetrics() {
public final ConcurrentMap<String, MetricInfo> getMemCache() {
return memCache;
public void addToMemCache(String workerSlot, MetricInfo metricInfo) {
memCache.put(workerSlot, metricInfo);"update mem cache, worker:{}, total uploaded:{}", workerSlot, memCache.size());
public boolean readyToUpload() {
return memCache.size() >= workerSet.size();
public boolean isMerging() {
return isMerging.get();
public void setMerging(boolean isMerging) {
public int getUploadedWorkerNum() {
return memCache.size();
public TopologyMetric mergeMetrics() {
long start = System.currentTimeMillis();
if (getMemCache().size() == 0) {
//"topology:{}, metric size is 0, skip...", topologyId);
return null;
if (isMerging()) {"topology {} is already merging, skip...", topologyId);
return null;
try {
Map<String, MetricInfo> workerMetricMap = this.memCache;
// reset mem cache
this.memCache = new ConcurrentHashMap<>();
MetricInfo topologyMetrics = MetricUtils.mkMetricInfo();
MetricInfo componentMetrics = MetricUtils.mkMetricInfo();
MetricInfo taskMetrics = MetricUtils.mkMetricInfo();
MetricInfo streamMetrics = MetricUtils.mkMetricInfo();
MetricInfo workerMetrics = MetricUtils.mkMetricInfo();
MetricInfo nettyMetrics = MetricUtils.mkMetricInfo();
TopologyMetric tpMetric =
new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, streamMetrics, nettyMetrics);
// metric name => worker count
Map<String, Integer> metricNameCounters = new HashMap<>();
// special for histograms & timers, we merge the points to get a new snapshot data.
Map<String, Map<Integer, Histogram>> histograms = new HashMap<>();
Map<String, Map<Integer, Timer>> timers = new HashMap<>();
// iterate metrics of all workers within the same topology
for (ConcurrentMap.Entry<String, MetricInfo> metricEntry : workerMetricMap.entrySet()) {
MetricInfo metricInfo = metricEntry.getValue();
// merge counters: add old and new values, note we only add incoming new metrics and overwrite
// existing data, same for all below.
Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics();
for (Map.Entry<String, Map<Integer, MetricSnapshot>> metric : metrics.entrySet()) {
String metricName = metric.getKey();
Map<Integer, MetricSnapshot> data = metric.getValue();
MetaType metaType = MetricUtils.metaType(metricName);
MetricType metricType = MetricUtils.metricType(metricName);
if (metricType == MetricType.COUNTER) {
mergeCounters(tpMetric, metaType, metricName, data);
} else if (metricType == MetricType.GAUGE) {
mergeGauges(tpMetric, metaType, metricName, data);
} else if (metricType == MetricType.METER) {
mergeMeters(getMetricInfoByType(tpMetric, metaType), metricName, data, metricNameCounters);
} else if (metricType == MetricType.HISTOGRAM) {
mergeHistograms(getMetricInfoByType(tpMetric, metaType),
metricName, data, metricNameCounters, histograms);
} else if (metricType == MetricType.TIMER) {
mergeTimers(getMetricInfoByType(tpMetric, metaType),
metricName, data, metricNameCounters, timers);
adjustHistogramTimerMetrics(tpMetric, metricNameCounters, histograms, timers);
// for counters, we only report delta data every time, need to sum with old data
//adjustCounterMetrics(tpMetric, oldTpMetric);"merge topology metrics:{}, cost:{}", topologyId, System.currentTimeMillis() - start);
// debug logs
return tpMetric;
} finally {
protected MetricInfo getMetricInfoByType(TopologyMetric topologyMetric, MetaType type) {
if (type == MetaType.TASK) {
return topologyMetric.get_taskMetric();
} else if (type == MetaType.WORKER) {
return topologyMetric.get_workerMetric();
} else if (type == MetaType.COMPONENT) {
return topologyMetric.get_componentMetric();
} else if (type == MetaType.STREAM) {
return topologyMetric.get_streamMetric();
} else if (type == MetaType.NETTY) {
return topologyMetric.get_nettyMetric();
} else if (type == MetaType.TOPOLOGY) {
return topologyMetric.get_topologyMetric();
return null;
public void mergeCounters(TopologyMetric tpMetric, MetaType metaType, String meta,
Map<Integer, MetricSnapshot> data) {
MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
} else {
old.set_longValue(old.get_longValue() + snapshot.get_longValue());
public void mergeGauges(TopologyMetric tpMetric, MetaType metaType, String meta,
Map<Integer, MetricSnapshot> data) {
MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
} else {
if (snapshot.get_ts() >= old.get_ts()) {
if (metaType != MetaType.TOPOLOGY) {
} else { // for topology metric, gauge might be add-able, e.g., cpu, memory, etc.
old.set_doubleValue(old.get_doubleValue() + snapshot.get_doubleValue());
* meters are not sampled.
public void mergeMeters(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
Map<String, Integer> metaCounters) {
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
} else {
if (snapshot.get_ts() >= old.get_ts()) {
old.set_mean(old.get_mean() + snapshot.get_mean());
old.set_m1(old.get_m1() + snapshot.get_m1());
old.set_m5(old.get_m5() + snapshot.get_m5());
old.set_m15(old.get_m15() + snapshot.get_m15());
updateMetricCounters(meta, metaCounters);
* histograms are sampled, but we just update points
public void mergeHistograms(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
Map<String, Integer> metaCounters, Map<String, Map<Integer, Histogram>> histograms) {
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
Map<Integer, Histogram> histogramMap = new HashMap<>();
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Histogram histogram = MetricUtils.metricSnapshot2Histogram(dataEntry.getValue());
histogramMap.put(dataEntry.getKey(), histogram);
histograms.put(meta, histogramMap);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
histograms.get(meta).put(win, MetricUtils.metricSnapshot2Histogram(snapshot));
} else {
if (snapshot.get_ts() >= old.get_ts()) {
// update points
MetricUtils.updateHistogramPoints(histograms.get(meta).get(win), snapshot.get_points());
updateMetricCounters(meta, metaCounters);
* timers are sampled, we just update points
public void mergeTimers(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
Map<String, Integer> metaCounters, Map<String, Map<Integer, Timer>> timers) {
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
Map<Integer, Timer> timerMap = new HashMap<>();
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Timer timer = MetricUtils.metricSnapshot2Timer(dataEntry.getValue());
timerMap.put(dataEntry.getKey(), timer);
timers.put(meta, timerMap);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
timers.get(meta).put(win, MetricUtils.metricSnapshot2Timer(snapshot));
} else {
if (snapshot.get_ts() >= old.get_ts()) {
old.set_m1(old.get_m1() + snapshot.get_m1());
old.set_m5(old.get_m5() + snapshot.get_m5());
old.set_m15(old.get_m15() + snapshot.get_m15());
// update points
MetricUtils.updateTimerPoints(timers.get(meta).get(win), snapshot.get_points());
updateMetricCounters(meta, metaCounters);
* computes occurrences of specified metric name
protected void updateMetricCounters(String metricName, Map<String, Integer> metricNameCounters) {
if (metricNameCounters.containsKey(metricName)) {
metricNameCounters.put(metricName, metricNameCounters.get(metricName) + 1);
} else {
metricNameCounters.put(metricName, 1);
protected void adjustHistogramTimerMetrics(TopologyMetric tpMetric, Map<String, Integer> metaCounters,
Map<String, Map<Integer, Histogram>> histograms,
Map<String, Map<Integer, Timer>> timers) {
Map<String, Map<Integer, MetricSnapshot>> compMetrics =
Map<String, Map<Integer, MetricSnapshot>> topologyMetrics =
adjustMetrics(compMetrics, metaCounters, histograms, timers);
adjustMetrics(topologyMetrics, metaCounters, histograms, timers);
private void adjustMetrics(Map<String, Map<Integer, MetricSnapshot>> metrics, Map<String, Integer> metaCounters,
Map<String, Map<Integer, Histogram>> histograms, Map<String, Map<Integer, Timer>> timers) {
for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
String meta = metricEntry.getKey();
MetricType metricType = MetricUtils.metricType(meta);
MetaType metaType = MetricUtils.metaType(meta);
Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
if (metricType == MetricType.HISTOGRAM) {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
MetricSnapshot snapshot = dataEntry.getValue();
Integer cnt = metaCounters.get(meta);
Histogram histogram = histograms.get(meta).get(dataEntry.getKey());
if (cnt != null && cnt > 1) {
Snapshot snapshot1 = histogram.getSnapshot();
if (metaType == MetaType.TOPOLOGY) {
if (metaType != MetaType.TOPOLOGY) {
snapshot.set_points(new ArrayList<Long>(0));
} else if (metricType == MetricType.TIMER) {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
MetricSnapshot snapshot = dataEntry.getValue();
Integer cnt = metaCounters.get(meta);
if (cnt != null && cnt > 1) {
Timer timer = timers.get(meta).get(dataEntry.getKey());
Snapshot snapshot1 = timer.getSnapshot();
snapshot.set_points(new ArrayList<Long>(0));
private void resetPoints(Map<String, Map<Integer, MetricSnapshot>> metrics) {
for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
String meta = metricEntry.getKey();
MetricType metricType = MetricUtils.metricType(meta);
Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
for (MetricSnapshot snapshot : winData.values()) {
snapshot.set_points(new ArrayList<Long>(0));
protected void adjustCounterMetrics(TopologyMetric tpMetric, TopologyMetric oldMetric) {
if (oldMetric != null) {
* sum old counter snapshots and new counter snapshots, sums are stored in new snapshots.
private void mergeCounters(Map<String, Map<Integer, MetricSnapshot>> newCounters,
Map<String, Map<Integer, MetricSnapshot>> oldCounters) {
for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : newCounters.entrySet()) {
String metricName = entry.getKey();
Map<Integer, MetricSnapshot> snapshots = entry.getValue();
Map<Integer, MetricSnapshot> oldSnapshots = oldCounters.get(metricName);
if (oldSnapshots != null && oldSnapshots.size() > 0) {
for (Map.Entry<Integer, MetricSnapshot> snapshotEntry : snapshots.entrySet()) {
Integer win = snapshotEntry.getKey();
MetricSnapshot snapshot = snapshotEntry.getValue();
MetricSnapshot oldSnapshot = oldSnapshots.get(win);
if (oldSnapshot != null) {
snapshot.set_longValue(snapshot.get_longValue() + oldSnapshot.get_longValue());
private double getSampleRate() {
return ConfigExtension.getMetricSampleRate(conf);