blob: 8dae281b24797b67873fd2b08ead7994891dc1c6 [file] [log] [blame]
package com.alibaba.jstorm.metric;
import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.TopologyMetric;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
* A topology metric context contains all in-memory metric data of a topology.
* This class resides in TopologyMaster.
*
* @author Cody (weiyue.wy@alibaba-inc.com)
* @since 2.0.5
*/
public class TopologyMetricContext {
private final Logger LOG = LoggerFactory.getLogger(getClass());
private final ReentrantLock lock = new ReentrantLock();
private Set<ResourceWorkerSlot> workerSet;
private int taskNum = 1;
private ConcurrentMap<String, MetricInfo> memCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> memMeta = new ConcurrentHashMap<>();
private final AtomicBoolean isMerging = new AtomicBoolean(false);
private String topologyId;
private volatile int flushedMetaNum = 0;
/**
* sync meta from metric cache on startup
*/
private volatile boolean syncMeta = false;
private Map conf;
public TopologyMetricContext() {
}
public TopologyMetricContext(Set<ResourceWorkerSlot> workerSet) {
this.workerSet = workerSet;
}
public TopologyMetricContext(String topologyId, Set<ResourceWorkerSlot> workerSet, Map conf) {
this(workerSet);
this.topologyId = topologyId;
this.conf = conf;
}
public ConcurrentMap<String, Long> getMemMeta() {
return memMeta;
}
public String getTopologyId() {
return topologyId;
}
public void setTopologyId(String topologyId) {
this.topologyId = topologyId;
}
public boolean syncMeta() {
return syncMeta;
}
public void setSyncMeta(boolean syncMeta) {
this.syncMeta = syncMeta;
}
public int getTaskNum() {
return taskNum;
}
public void setTaskNum(int taskNum) {
this.taskNum = taskNum;
}
public int getFlushedMetaNum() {
return flushedMetaNum;
}
public void setFlushedMetaNum(int flushedMetaNum) {
this.flushedMetaNum = flushedMetaNum;
}
public ReentrantLock getLock() {
return lock;
}
public int getWorkerNum() {
return workerSet.size();
}
public void setWorkerSet(Set<ResourceWorkerSlot> workerSet) {
this.workerSet = workerSet;
}
public void resetUploadedMetrics() {
this.memCache.clear();
}
public final ConcurrentMap<String, MetricInfo> getMemCache() {
return memCache;
}
public void addToMemCache(String workerSlot, MetricInfo metricInfo) {
memCache.put(workerSlot, metricInfo);
LOG.info("update mem cache, worker:{}, total uploaded:{}", workerSlot, memCache.size());
}
public boolean readyToUpload() {
return memCache.size() >= workerSet.size();
}
public boolean isMerging() {
return isMerging.get();
}
public void setMerging(boolean isMerging) {
this.isMerging.set(isMerging);
}
public int getUploadedWorkerNum() {
return memCache.size();
}
public TopologyMetric mergeMetrics() {
long start = System.currentTimeMillis();
if (getMemCache().size() == 0) {
//LOG.info("topology:{}, metric size is 0, skip...", topologyId);
return null;
}
if (isMerging()) {
LOG.info("topology {} is already merging, skip...", topologyId);
return null;
}
setMerging(true);
try {
Map<String, MetricInfo> workerMetricMap = this.memCache;
// reset mem cache
this.memCache = new ConcurrentHashMap<>();
MetricInfo topologyMetrics = MetricUtils.mkMetricInfo();
MetricInfo componentMetrics = MetricUtils.mkMetricInfo();
MetricInfo taskMetrics = MetricUtils.mkMetricInfo();
MetricInfo streamMetrics = MetricUtils.mkMetricInfo();
MetricInfo workerMetrics = MetricUtils.mkMetricInfo();
MetricInfo nettyMetrics = MetricUtils.mkMetricInfo();
TopologyMetric tpMetric =
new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, streamMetrics, nettyMetrics);
// metric name => worker count
Map<String, Integer> metricNameCounters = new HashMap<>();
// special for histograms & timers, we merge the points to get a new snapshot data.
Map<String, Map<Integer, Histogram>> histograms = new HashMap<>();
Map<String, Map<Integer, Timer>> timers = new HashMap<>();
// iterate metrics of all workers within the same topology
for (ConcurrentMap.Entry<String, MetricInfo> metricEntry : workerMetricMap.entrySet()) {
MetricInfo metricInfo = metricEntry.getValue();
// merge counters: add old and new values, note we only add incoming new metrics and overwrite
// existing data, same for all below.
Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics();
for (Map.Entry<String, Map<Integer, MetricSnapshot>> metric : metrics.entrySet()) {
String metricName = metric.getKey();
Map<Integer, MetricSnapshot> data = metric.getValue();
MetaType metaType = MetricUtils.metaType(metricName);
MetricType metricType = MetricUtils.metricType(metricName);
if (metricType == MetricType.COUNTER) {
mergeCounters(tpMetric, metaType, metricName, data);
} else if (metricType == MetricType.GAUGE) {
mergeGauges(tpMetric, metaType, metricName, data);
} else if (metricType == MetricType.METER) {
mergeMeters(getMetricInfoByType(tpMetric, metaType), metricName, data, metricNameCounters);
} else if (metricType == MetricType.HISTOGRAM) {
mergeHistograms(getMetricInfoByType(tpMetric, metaType),
metricName, data, metricNameCounters, histograms);
} else if (metricType == MetricType.TIMER) {
mergeTimers(getMetricInfoByType(tpMetric, metaType),
metricName, data, metricNameCounters, timers);
}
}
}
adjustHistogramTimerMetrics(tpMetric, metricNameCounters, histograms, timers);
// for counters, we only report delta data every time, need to sum with old data
//adjustCounterMetrics(tpMetric, oldTpMetric);
LOG.info("merge topology metrics:{}, cost:{}", topologyId, System.currentTimeMillis() - start);
// debug logs
//MetricUtils.printMetricWinSize(componentMetrics);
return tpMetric;
} finally {
setMerging(false);
}
}
protected MetricInfo getMetricInfoByType(TopologyMetric topologyMetric, MetaType type) {
if (type == MetaType.TASK) {
return topologyMetric.get_taskMetric();
} else if (type == MetaType.WORKER) {
return topologyMetric.get_workerMetric();
} else if (type == MetaType.COMPONENT) {
return topologyMetric.get_componentMetric();
} else if (type == MetaType.STREAM) {
return topologyMetric.get_streamMetric();
} else if (type == MetaType.NETTY) {
return topologyMetric.get_nettyMetric();
} else if (type == MetaType.TOPOLOGY) {
return topologyMetric.get_topologyMetric();
}
return null;
}
public void mergeCounters(TopologyMetric tpMetric, MetaType metaType, String meta,
Map<Integer, MetricSnapshot> data) {
MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
} else {
old.set_ts(snapshot.get_ts());
old.set_longValue(old.get_longValue() + snapshot.get_longValue());
}
}
}
}
public void mergeGauges(TopologyMetric tpMetric, MetaType metaType, String meta,
Map<Integer, MetricSnapshot> data) {
MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
} else {
if (snapshot.get_ts() >= old.get_ts()) {
old.set_ts(snapshot.get_ts());
if (metaType != MetaType.TOPOLOGY) {
old.set_doubleValue(snapshot.get_doubleValue());
} else { // for topology metric, gauge might be add-able, e.g., cpu, memory, etc.
old.set_doubleValue(old.get_doubleValue() + snapshot.get_doubleValue());
}
}
}
}
}
}
/**
* meters are not sampled.
*/
public void mergeMeters(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
Map<String, Integer> metaCounters) {
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
} else {
if (snapshot.get_ts() >= old.get_ts()) {
old.set_ts(snapshot.get_ts());
old.set_mean(old.get_mean() + snapshot.get_mean());
old.set_m1(old.get_m1() + snapshot.get_m1());
old.set_m5(old.get_m5() + snapshot.get_m5());
old.set_m15(old.get_m15() + snapshot.get_m15());
}
}
}
}
updateMetricCounters(meta, metaCounters);
}
/**
* histograms are sampled, but we just update points
*/
public void mergeHistograms(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
Map<String, Integer> metaCounters, Map<String, Map<Integer, Histogram>> histograms) {
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
Map<Integer, Histogram> histogramMap = new HashMap<>();
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Histogram histogram = MetricUtils.metricSnapshot2Histogram(dataEntry.getValue());
histogramMap.put(dataEntry.getKey(), histogram);
}
histograms.put(meta, histogramMap);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
histograms.get(meta).put(win, MetricUtils.metricSnapshot2Histogram(snapshot));
} else {
if (snapshot.get_ts() >= old.get_ts()) {
old.set_ts(snapshot.get_ts());
// update points
MetricUtils.updateHistogramPoints(histograms.get(meta).get(win), snapshot.get_points());
}
}
}
}
updateMetricCounters(meta, metaCounters);
}
/**
* timers are sampled, we just update points
*/
public void mergeTimers(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
Map<String, Integer> metaCounters, Map<String, Map<Integer, Timer>> timers) {
Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
if (existing == null) {
metricInfo.put_to_metrics(meta, data);
Map<Integer, Timer> timerMap = new HashMap<>();
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Timer timer = MetricUtils.metricSnapshot2Timer(dataEntry.getValue());
timerMap.put(dataEntry.getKey(), timer);
}
timers.put(meta, timerMap);
} else {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
Integer win = dataEntry.getKey();
MetricSnapshot snapshot = dataEntry.getValue();
MetricSnapshot old = existing.get(win);
if (old == null) {
existing.put(win, snapshot);
timers.get(meta).put(win, MetricUtils.metricSnapshot2Timer(snapshot));
} else {
if (snapshot.get_ts() >= old.get_ts()) {
old.set_ts(snapshot.get_ts());
old.set_m1(old.get_m1() + snapshot.get_m1());
old.set_m5(old.get_m5() + snapshot.get_m5());
old.set_m15(old.get_m15() + snapshot.get_m15());
// update points
MetricUtils.updateTimerPoints(timers.get(meta).get(win), snapshot.get_points());
}
}
}
}
updateMetricCounters(meta, metaCounters);
}
/**
* computes occurrences of specified metric name
*/
protected void updateMetricCounters(String metricName, Map<String, Integer> metricNameCounters) {
if (metricNameCounters.containsKey(metricName)) {
metricNameCounters.put(metricName, metricNameCounters.get(metricName) + 1);
} else {
metricNameCounters.put(metricName, 1);
}
}
protected void adjustHistogramTimerMetrics(TopologyMetric tpMetric, Map<String, Integer> metaCounters,
Map<String, Map<Integer, Histogram>> histograms,
Map<String, Map<Integer, Timer>> timers) {
resetPoints(tpMetric.get_taskMetric().get_metrics());
resetPoints(tpMetric.get_streamMetric().get_metrics());
resetPoints(tpMetric.get_nettyMetric().get_metrics());
resetPoints(tpMetric.get_workerMetric().get_metrics());
Map<String, Map<Integer, MetricSnapshot>> compMetrics =
tpMetric.get_componentMetric().get_metrics();
Map<String, Map<Integer, MetricSnapshot>> topologyMetrics =
tpMetric.get_topologyMetric().get_metrics();
adjustMetrics(compMetrics, metaCounters, histograms, timers);
adjustMetrics(topologyMetrics, metaCounters, histograms, timers);
}
private void adjustMetrics(Map<String, Map<Integer, MetricSnapshot>> metrics, Map<String, Integer> metaCounters,
Map<String, Map<Integer, Histogram>> histograms, Map<String, Map<Integer, Timer>> timers) {
for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
String meta = metricEntry.getKey();
MetricType metricType = MetricUtils.metricType(meta);
MetaType metaType = MetricUtils.metaType(meta);
Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
if (metricType == MetricType.HISTOGRAM) {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
MetricSnapshot snapshot = dataEntry.getValue();
Integer cnt = metaCounters.get(meta);
Histogram histogram = histograms.get(meta).get(dataEntry.getKey());
if (cnt != null && cnt > 1) {
Snapshot snapshot1 = histogram.getSnapshot();
snapshot.set_mean(snapshot1.getMean());
snapshot.set_p50(snapshot1.getMedian());
snapshot.set_p75(snapshot1.get75thPercentile());
snapshot.set_p95(snapshot1.get95thPercentile());
snapshot.set_p98(snapshot1.get98thPercentile());
snapshot.set_p99(snapshot1.get99thPercentile());
snapshot.set_p999(snapshot1.get999thPercentile());
snapshot.set_stddev(snapshot1.getStdDev());
snapshot.set_min(snapshot1.getMin());
snapshot.set_max(snapshot1.getMax());
if (metaType == MetaType.TOPOLOGY) {
snapshot.set_points(Arrays.asList(ArrayUtils.toObject(snapshot1.getValues())));
}
}
if (metaType != MetaType.TOPOLOGY) {
snapshot.set_points(new ArrayList<Long>(0));
}
}
} else if (metricType == MetricType.TIMER) {
for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
MetricSnapshot snapshot = dataEntry.getValue();
Integer cnt = metaCounters.get(meta);
if (cnt != null && cnt > 1) {
Timer timer = timers.get(meta).get(dataEntry.getKey());
Snapshot snapshot1 = timer.getSnapshot();
snapshot.set_p50(snapshot1.getMedian());
snapshot.set_p75(snapshot1.get75thPercentile());
snapshot.set_p95(snapshot1.get95thPercentile());
snapshot.set_p98(snapshot1.get98thPercentile());
snapshot.set_p99(snapshot1.get99thPercentile());
snapshot.set_p999(snapshot1.get999thPercentile());
snapshot.set_stddev(snapshot1.getStdDev());
snapshot.set_min(snapshot1.getMin());
snapshot.set_max(snapshot1.getMax());
}
snapshot.set_points(new ArrayList<Long>(0));
}
}
}
}
private void resetPoints(Map<String, Map<Integer, MetricSnapshot>> metrics) {
for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
String meta = metricEntry.getKey();
MetricType metricType = MetricUtils.metricType(meta);
Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
for (MetricSnapshot snapshot : winData.values()) {
snapshot.set_points(new ArrayList<Long>(0));
}
}
}
}
protected void adjustCounterMetrics(TopologyMetric tpMetric, TopologyMetric oldMetric) {
if (oldMetric != null) {
mergeCounters(tpMetric.get_streamMetric().get_metrics(),
oldMetric.get_streamMetric().get_metrics());
mergeCounters(tpMetric.get_taskMetric().get_metrics(),
oldMetric.get_taskMetric().get_metrics());
mergeCounters(tpMetric.get_componentMetric().get_metrics(),
oldMetric.get_componentMetric().get_metrics());
mergeCounters(tpMetric.get_workerMetric().get_metrics(),
oldMetric.get_workerMetric().get_metrics());
mergeCounters(tpMetric.get_nettyMetric().get_metrics(),
oldMetric.get_nettyMetric().get_metrics());
}
}
/**
* sum old counter snapshots and new counter snapshots, sums are stored in new snapshots.
*/
private void mergeCounters(Map<String, Map<Integer, MetricSnapshot>> newCounters,
Map<String, Map<Integer, MetricSnapshot>> oldCounters) {
for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : newCounters.entrySet()) {
String metricName = entry.getKey();
Map<Integer, MetricSnapshot> snapshots = entry.getValue();
Map<Integer, MetricSnapshot> oldSnapshots = oldCounters.get(metricName);
if (oldSnapshots != null && oldSnapshots.size() > 0) {
for (Map.Entry<Integer, MetricSnapshot> snapshotEntry : snapshots.entrySet()) {
Integer win = snapshotEntry.getKey();
MetricSnapshot snapshot = snapshotEntry.getValue();
MetricSnapshot oldSnapshot = oldSnapshots.get(win);
if (oldSnapshot != null) {
snapshot.set_longValue(snapshot.get_longValue() + oldSnapshot.get_longValue());
}
}
}
}
}
private double getSampleRate() {
return ConfigExtension.getMetricSampleRate(conf);
}
}