blob: 7af67c233ef81eafd0f1a97e279081135b2ca662 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.nimbus;
import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricSnapshot;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.MetricMeta;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.DefaultMetricUploader;
import com.alibaba.jstorm.daemon.nimbus.metric.uploader.MetricUploader;
import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* Topology metrics thread which resides in nimbus.
* This class is responsible for generating metrics IDs and uploading metrics to the underlying storage system.
*
* @author Cody (weiyue.wy@alibaba-inc.com)
* @since 2.0.5
*/
public class TopologyMetricsRunnable extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
protected JStormMetricCache metricCache;
/**
* map<topologyId, map<worker, metricInfo>>, local memory cache, keeps only one snapshot of metrics.
*/
protected final ConcurrentMap<String, TopologyMetricContext> topologyMetricContexts =
new ConcurrentHashMap<>();
protected final BlockingDeque<TopologyMetricsRunnable.Event> queue = new LinkedBlockingDeque<>();
private static final String PENDING_UPLOAD_METRIC_DATA = "__pending.upload.metrics__";
private static final String PENDING_UPLOAD_METRIC_DATA_INFO = "__pending.upload.metrics.info__";
// the slot is empty
private static final int UNSET = 0;
// the slot is ready for uploading
private static final int SET = 1;
// the slot is being uploaded
private static final int UPLOADING = 2;
// the slot will be set ready for uploading
private static final int PRE_SET = 3;
protected final AtomicIntegerArray metricStat;
protected StormClusterState stormClusterState;
protected MetricUploader metricUploader;
protected AtomicBoolean isShutdown;
protected String clusterName;
protected int maxPendingUploadMetrics;
private final boolean localMode;
private final NimbusData nimbusData;
private MetricQueryClient metricQueryClient;
private ScheduledExecutorService clusterMetricsUpdateExecutor;
/**
* refreshes alive topologies every min or on startup.
*/
protected AsyncLoopThread refreshTopologiesThread;
/**
* the thread for metric sending, checks every second.
*/
private final Thread uploadThread = new MetricsUploadThread();
/**
* async flush metric meta
*/
private final Thread flushMetricMetaThread = new FlushMetricMetaThread();
/**
* use default UUID generator
*/
private final MetricIDGenerator metricIDGenerator = new DefaultMetricIDGenerator();
public TopologyMetricsRunnable(final NimbusData nimbusData) {
setName(getClass().getSimpleName());
this.nimbusData = nimbusData;
this.localMode = nimbusData.isLocalMode();
if (localMode) {
this.metricStat = new AtomicIntegerArray(1);
return;
}
LOG.info("create topology metrics runnable.");
this.metricCache = nimbusData.getMetricCache();
this.stormClusterState = nimbusData.getStormClusterState();
this.isShutdown = nimbusData.getIsShutdown();
clusterName = ConfigExtension.getClusterName(nimbusData.getConf());
if (clusterName == null) {
throw new RuntimeException("cluster.name property must be set in storm.yaml!");
}
this.maxPendingUploadMetrics = ConfigExtension.getMaxPendingMetricNum(nimbusData.getConf());
this.metricStat = new AtomicIntegerArray(this.maxPendingUploadMetrics);
int cnt = 0;
for (int i = 0; i < maxPendingUploadMetrics; i++) {
TopologyMetricDataInfo obj = getMetricDataInfoFromCache(i);
if (obj != null) {
this.metricStat.set(i, SET);
cnt++;
}
}
LOG.info("pending upload metrics: {}", cnt);
// init alive topologies from zk
this.refreshTopologies();
this.refreshTopologiesThread = new AsyncLoopThread(new RefreshTopologiesThread());
this.clusterMetricsUpdateExecutor = Executors.newSingleThreadScheduledExecutor();
this.clusterMetricsUpdateExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
int secOffset = TimeUtils.secOffset();
int offset = 55;
if (secOffset < offset) {
JStormUtils.sleepMs((offset - secOffset) * 1000);
} else if (secOffset == offset) {
// do nothing
} else {
JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
}
LOG.info("cluster metrics force upload.");
mergeAndUploadClusterMetrics();
}
}, 5, 60, TimeUnit.SECONDS);
// track nimbus JVM heap
JStormMetrics.registerWorkerGauge(JStormMetrics.NIMBUS_METRIC_KEY, MetricDef.MEMORY_USED,
new AsmGauge(new Gauge<Double>() {
@Override
public Double getValue() {
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
return (double) memoryUsage.getUsed();
}
}));
}
/**
* init metric uploader
*/
public void init() {
String metricUploadClass = ConfigExtension.getMetricUploaderClass(nimbusData.getConf());
if (StringUtils.isBlank(metricUploadClass)) {
metricUploadClass = DefaultMetricUploader.class.getName();
}
// init metric uploader
LOG.info("metric uploader class:{}", metricUploadClass);
Object instance = Utils.newInstance(metricUploadClass);
if (!(instance instanceof MetricUploader)) {
throw new RuntimeException(metricUploadClass + " isn't MetricUploader class ");
}
this.metricUploader = (MetricUploader) instance;
try {
metricUploader.init(nimbusData);
} catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info("Successfully init {}", metricUploadClass);
// init metric query client
String metricQueryClientClass = ConfigExtension.getMetricQueryClientClass(nimbusData.getConf());
if (!StringUtils.isBlank(metricQueryClientClass)) {
LOG.info("metric query client class:{}", metricQueryClientClass);
this.metricQueryClient = (MetricQueryClient) Utils.newInstance(metricQueryClientClass);
} else {
LOG.warn("use default metric query client class.");
this.metricQueryClient = new DefaultMetricQueryClient();
}
try {
metricQueryClient.init(nimbusData.getConf());
} catch (Exception e) {
throw new RuntimeException(e);
}
this.uploadThread.start();
this.flushMetricMetaThread.start();
LOG.info("init topology metric runnable done.");
}
public void shutdown() {
LOG.info("Begin to shutdown");
metricUploader.cleanup();
LOG.info("Successfully shutdown");
}
@Override
public void run() {
while (!isShutdown.get()) {
if (localMode) {
return;
}
try {
// wait for metricUploader to be ready, for some external plugin like database, it'll take a few seconds
if (this.metricUploader != null) {
Event event = queue.poll();
if (event == null) {
continue;
}
if (event instanceof Remove) {
handleRemoveEvent((Remove) event);
} else if (event instanceof Update) {
handleUpdateEvent((Update) event);
} else if (event instanceof Refresh) {
handleRefreshEvent((Refresh) event);
} else if (event instanceof KillTopologyEvent) {
handleKillTopologyEvent((KillTopologyEvent) event);
} else if (event instanceof StartTopologyEvent) {
handleStartTopologyEvent((StartTopologyEvent) event);
} else if (event instanceof TaskDeadEvent) {
handleTaskDeadEvent((TaskDeadEvent) event);
} else if (event instanceof TaskStartEvent) {
handleTaskStartEvent((TaskStartEvent) event);
} else {
LOG.error("Unknown event type:{}", event.getClass());
}
}
} catch (Exception e) {
if (!isShutdown.get()) {
LOG.error(e.getMessage(), e);
}
}
}
}
public boolean isTopologyAlive(String topologyId) {
return topologyMetricContexts.containsKey(topologyId);
}
private int getAndPresetFirstEmptyIndex() {
for (int i = 0; i < maxPendingUploadMetrics; i++) {
if (metricStat.get(i) == UNSET) {
if (metricStat.compareAndSet(i, UNSET, PRE_SET)) {
return i;
}
}
}
return -1;
}
private int getFirstPendingUploadIndex() {
for (int i = 0; i < maxPendingUploadMetrics; i++) {
if (metricStat.get(i) == SET) {
return i;
}
}
return -1;
}
public void markUploaded(int idx) {
this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA + idx);
this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
this.metricStat.set(idx, UNSET);
}
public void markUploading(int idx) {
this.metricStat.set(idx, UPLOADING);
}
public void markSet(int idx) {
this.metricStat.set(idx, SET);
}
public TopologyMetric getMetricDataFromCache(int idx) {
return (TopologyMetric) metricCache.get(PENDING_UPLOAD_METRIC_DATA + idx);
}
public TopologyMetricDataInfo getMetricDataInfoFromCache(int idx) {
return (TopologyMetricDataInfo) metricCache.get(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
}
public void pushEvent(Event cmd) {
queue.offer(cmd);
}
public Map<String, Long> registerMetrics(String topologyId, Set<String> metricNames) {
TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
ConcurrentMap<String, Long> memMeta = topologyMetricContexts.get(topologyId).getMemMeta();
Map<String, Long> ret = new HashMap<>();
for (String metricName : metricNames) {
Long id = memMeta.get(metricName);
if (id != null && MetricUtils.isValidId(id)) {
ret.put(metricName, id);
} else {
id = metricIDGenerator.genMetricId(metricName);
Long old = memMeta.putIfAbsent(metricName, id);
if (old == null) {
ret.put(metricName, id);
} else {
ret.put(metricName, old);
}
}
}
long cost = ticker.stop();
LOG.info("register metrics, topology:{}, size:{}, cost:{}", topologyId, metricNames.size(), cost);
return ret;
}
public void handleRemoveEvent(Remove event) {
String topologyId = event.topologyId;
if (topologyId != null) {
removeTopology(topologyId);
}
LOG.info("remove topology:{}.", topologyId);
}
private void removeTopology(String topologyId) {
metricCache.removeTopology(topologyId);
metricCache.removeSampleRate(topologyId);
topologyMetricContexts.remove(topologyId);
}
public void refreshTopologies() {
if (!topologyMetricContexts.containsKey(JStormMetrics.NIMBUS_METRIC_KEY)) {
LOG.info("adding __nimbus__ to metric context.");
Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
TopologyMetricContext metricContext = new TopologyMetricContext(workerSlot);
topologyMetricContexts.putIfAbsent(JStormMetrics.NIMBUS_METRIC_KEY, metricContext);
syncMetaFromCache(JStormMetrics.NIMBUS_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.NIMBUS_METRIC_KEY));
}
if (!topologyMetricContexts.containsKey(JStormMetrics.CLUSTER_METRIC_KEY)) {
LOG.info("adding __cluster__ to metric context.");
Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
Map conf = new HashMap();
//there's no need to consider sample rate when cluster metrics merge
conf.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, 1.0);
TopologyMetricContext metricContext = new TopologyMetricContext(
JStormMetrics.CLUSTER_METRIC_KEY, workerSlot, conf);
topologyMetricContexts.putIfAbsent(JStormMetrics.CLUSTER_METRIC_KEY, metricContext);
syncMetaFromCache(JStormMetrics.CLUSTER_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY));
}
Map<String, Assignment> assignMap;
try {
assignMap = Cluster.get_all_assignment(stormClusterState, null);
for (String topologyId : assignMap.keySet()) {
if (!topologyMetricContexts.containsKey(topologyId)) {
Assignment assignment = assignMap.get(topologyId);
TopologyMetricContext metricContext =
new TopologyMetricContext(assignment.getWorkers());
metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
syncMetaFromCache(topologyId, metricContext);
LOG.info("adding {} to metric context.", topologyId);
topologyMetricContexts.put(topologyId, metricContext);
}
}
} catch (Exception e1) {
LOG.warn("failed to get assignments");
return;
}
List<String> removing = new ArrayList<>();
for (String topologyId : topologyMetricContexts.keySet()) {
if (!JStormMetrics.NIMBUS_METRIC_KEY.equals(topologyId)
&& !JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)
&& !assignMap.containsKey(topologyId)) {
removing.add(topologyId);
}
}
for (String topologyId : removing) {
LOG.info("removing topology:{}", topologyId);
removeTopology(topologyId);
}
}
/**
* sync topology metric meta from external storage like TDDL/OTS.
* nimbus server will skip syncing, only followers do this
*/
public void syncTopologyMeta() {
String nimbus = JStormMetrics.NIMBUS_METRIC_KEY;
if (topologyMetricContexts.containsKey(nimbus)) {
syncMetaFromRemote(nimbus, topologyMetricContexts.get(nimbus));
}
String cluster = JStormMetrics.CLUSTER_METRIC_KEY;
if (topologyMetricContexts.containsKey(cluster)) {
syncMetaFromRemote(cluster, topologyMetricContexts.get(cluster));
}
Map<String, Assignment> assignMap;
try {
assignMap = Cluster.get_all_assignment(stormClusterState, null);
for (String topologyId : assignMap.keySet()) {
if (topologyMetricContexts.containsKey(topologyId)) {
Assignment assignment = assignMap.get(topologyId);
TopologyMetricContext metricContext =
new TopologyMetricContext(assignment.getWorkers());
metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
syncMetaFromCache(topologyId, metricContext);
syncMetaFromRemote(topologyId, metricContext);
}
}
} catch (Exception e1) {
LOG.warn("failed to get assignments");
}
}
/**
* sync metric meta from rocks db into mem cache on startup
*/
private void syncMetaFromCache(String topologyId, TopologyMetricContext context) {
if (!context.syncMeta()) {
Map<String, Long> meta = metricCache.getMeta(topologyId);
if (meta != null) {
context.getMemMeta().putAll(meta);
}
context.setSyncMeta(true);
}
}
private void syncMetaFromRemote(String topologyId, TopologyMetricContext context) {
try {
int memSize = context.getMemMeta().size();
int zkSize = (Integer) stormClusterState.get_topology_metric(topologyId);
if (memSize != zkSize) {
ConcurrentMap<String, Long> memMeta = context.getMemMeta();
for (MetaType metaType : MetaType.values()) {
List<MetricMeta> metaList = metricQueryClient.getMetricMeta(clusterName, topologyId, metaType);
if (metaList != null) {
LOG.info("get remote metric meta, topology:{}, metaType:{}, mem:{}, zk:{}, new size:{}",
topologyId, metaType, memSize, zkSize, metaList.size());
for (MetricMeta meta : metaList) {
memMeta.putIfAbsent(meta.getFQN(), meta.getId());
}
}
}
metricCache.putMeta(topologyId, memMeta);
}
} catch (Exception ex) {
LOG.error("failed to sync remote meta", ex);
}
}
/**
* send topology track to jstorm monitor
*/
protected void handleKillTopologyEvent(KillTopologyEvent event) {
metricUploader.sendEvent(this.clusterName, event);
removeTopology(event.topologyId);
}
private void handleStartTopologyEvent(StartTopologyEvent event) {
this.metricCache.putSampleRate(event.topologyId, event.sampleRate);
metricUploader.sendEvent(this.clusterName, event);
if (!topologyMetricContexts.containsKey(event.topologyId)) {
TopologyMetricContext metricContext = new TopologyMetricContext();
// note that workerNum is not set here.
this.topologyMetricContexts.put(event.topologyId, metricContext);
}
}
private void handleTaskDeadEvent(TaskDeadEvent event) {
metricUploader.sendEvent(this.clusterName, event);
// unregister dead workers
Set<ResourceWorkerSlot> workers = new HashSet<>();
workers.addAll(event.deadTasks.values());
for (ResourceWorkerSlot worker : workers) {
metricCache.unregisterWorker(event.topologyId, worker.getHostname(), worker.getPort());
}
}
private void handleTaskStartEvent(final TaskStartEvent event) {
Assignment assignment = event.newAssignment;
TopologyMetricContext metricContext = topologyMetricContexts.get(event.topologyId);
if (metricContext != null) {
metricContext.setWorkerSet(assignment.getWorkers());
} else {
metricContext = new TopologyMetricContext();
metricContext.setWorkerSet(assignment.getWorkers());
topologyMetricContexts.put(event.topologyId, metricContext);
}
metricUploader.sendEvent(this.clusterName, event);
}
/**
* merge and send all metric data.
*/
public void handleRefreshEvent(Refresh dummy) {
TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
try {
refreshTopologies();
LOG.info("refresh topologies, cost:{}", ticker.stopAndRestart());
if (!nimbusData.isLeader()) {
syncTopologyMeta();
LOG.info("sync topology meta, cost:{}", ticker.stop());
}
} catch (Exception ex) {
LOG.error("handleRefreshEvent error:", ex);
}
}
private TopologyMetricContext getClusterTopologyMetricContext() {
return topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY);
}
private void mergeAndUploadClusterMetrics() {
TopologyMetricContext context = getClusterTopologyMetricContext();
TopologyMetric tpMetric = context.mergeMetrics();
if (tpMetric == null) {
tpMetric = MetricUtils.mkTopologyMetric();
tpMetric.set_topologyMetric(MetricUtils.mkMetricInfo());
}
//reset snapshots metric id
MetricInfo clusterMetrics = tpMetric.get_topologyMetric();
Map<String, Long> metricNames = context.getMemMeta();
for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : clusterMetrics.get_metrics().entrySet()) {
String metricName = entry.getKey();
MetricType metricType = MetricUtils.metricType(metricName);
Long metricId = metricNames.get(metricName);
for (Map.Entry<Integer, MetricSnapshot> metric : entry.getValue().entrySet()) {
MetricSnapshot snapshot = metric.getValue();
snapshot.set_metricId(metricId);
if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
snapshot.set_points(new ArrayList<Long>(0));
}
// entry.getValue().put(metric.getKey(), snapshot);
}
}
//fill the unacquired metrics with zero
long ts = System.currentTimeMillis();
for (Map.Entry<String, Long> entry : metricNames.entrySet()) {
String name = entry.getKey();
if (!clusterMetrics.get_metrics().containsKey(name)) {
Map<Integer, MetricSnapshot> metric = new HashMap<>();
MetricType type = MetricUtils.metricType(name);
metric.put(AsmWindow.M1_WINDOW, new MetricSnapshot(entry.getValue(), ts, type.getT()));
clusterMetrics.put_to_metrics(name, metric);
}
}
//upload to cache
Update event = new Update();
event.timestamp = System.currentTimeMillis();
event.topologyMetrics = tpMetric;
event.topologyId = JStormMetrics.CLUSTER_METRIC_KEY;
pushEvent(event);
LOG.info("send update event for cluster metrics, size : {}", clusterMetrics.get_metrics_size());
}
//update cluster metrics local cache
private void updateClusterMetrics(String topologyId, TopologyMetric tpMetric) {
if (tpMetric.get_topologyMetric().get_metrics_size() > 0) {
TopologyMetricContext context = getClusterTopologyMetricContext();
MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
// make a new MetricInfo to save the topologyId's metric
MetricInfo clusterMetrics = MetricUtils.mkMetricInfo();
Set<String> metricNames = new HashSet<>();
for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : topologyMetrics.get_metrics().entrySet()) {
String metricName = MetricUtils.topo2clusterName(entry.getKey());
MetricType metricType = MetricUtils.metricType(metricName);
Map<Integer, MetricSnapshot> winData = new HashMap<>();
for (Map.Entry<Integer, MetricSnapshot> entryData : entry.getValue().entrySet()) {
MetricSnapshot snapshot = entryData.getValue().deepCopy();
winData.put(entryData.getKey(), snapshot);
if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
// reset topology metric points
entryData.getValue().set_points(new ArrayList<Long>(0));
}
}
clusterMetrics.put_to_metrics(metricName, winData);
metricNames.add(metricName);
}
// save to local cache, waiting for merging
context.addToMemCache(topologyId, clusterMetrics);
registerMetrics(JStormMetrics.CLUSTER_METRIC_KEY, metricNames);
}
}
/**
* put metric data to metric cache.
*/
public void handleUpdateEvent(Update event) {
TopologyMetric topologyMetrics = event.topologyMetrics;
final String topologyId = event.topologyId;
if (this.topologyMetricContexts.containsKey(topologyId)) {
if (!JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)) {
updateClusterMetrics(topologyId, topologyMetrics);
}
// overwrite
metricCache.putMetricData(topologyId, topologyMetrics);
// below process is kind of a transaction, first we lock an empty slot, mark it as PRE_SET
// by this time the slot is not yet ready for uploading as the upload thread looks for SET slots only
// after all metrics data has been saved, we mark it as SET, then it's ready for uploading.
int idx = getAndPresetFirstEmptyIndex();
if (idx >= 0) {
TopologyMetricDataInfo summary = new TopologyMetricDataInfo();
summary.topologyId = topologyId;
summary.timestamp = event.timestamp;
if (topologyId.equals(JStormMetrics.NIMBUS_METRIC_KEY) ||
topologyId.equals(JStormMetrics.CLUSTER_METRIC_KEY)) {
summary.type = MetricUploader.METRIC_TYPE_TOPLOGY;
} else {
if (topologyMetrics.get_topologyMetric().get_metrics_size() > 0 ||
topologyMetrics.get_componentMetric().get_metrics_size() > 0) {
if (topologyMetrics.get_taskMetric().get_metrics_size() +
topologyMetrics.get_workerMetric().get_metrics_size() +
topologyMetrics.get_nettyMetric().get_metrics_size() +
topologyMetrics.get_streamMetric().get_metrics_size() > 0) {
summary.type = MetricUploader.METRIC_TYPE_ALL;
} else {
summary.type = MetricUploader.METRIC_TYPE_TOPLOGY;
}
} else {
summary.type = MetricUploader.METRIC_TYPE_TASK;
}
}
metricCache.put(PENDING_UPLOAD_METRIC_DATA_INFO + idx, summary);
metricCache.put(PENDING_UPLOAD_METRIC_DATA + idx, topologyMetrics);
markSet(idx);
LOG.info("put metric data to local cache, topology:{}, idx:{}", topologyId, idx);
} else {
LOG.error("exceeding maxPendingUploadMetrics, skip metrics data for topology:{}", topologyId);
}
} else {
LOG.warn("topology {} has been killed or has not started, skip update.", topologyId);
}
}
/**
* get topology metrics, note that only topology & component & worker metrics are returned
*/
public TopologyMetric getTopologyMetric(String topologyId) {
long start = System.nanoTime();
try {
TopologyMetric ret = new TopologyMetric();
List<MetricInfo> topologyMetrics = metricCache.getMetricData(topologyId, MetaType.TOPOLOGY);
List<MetricInfo> componentMetrics = metricCache.getMetricData(topologyId, MetaType.COMPONENT);
List<MetricInfo> workerMetrics = metricCache.getMetricData(topologyId, MetaType.WORKER);
MetricInfo dummy = MetricUtils.mkMetricInfo();
if (topologyMetrics.size() > 0) {
// get the last min topology metric
ret.set_topologyMetric(topologyMetrics.get(topologyMetrics.size() - 1));
} else {
ret.set_topologyMetric(dummy);
}
if (componentMetrics.size() > 0) {
ret.set_componentMetric(componentMetrics.get(0));
} else {
ret.set_componentMetric(dummy);
}
if (workerMetrics.size() > 0) {
ret.set_workerMetric(workerMetrics.get(0));
} else {
ret.set_workerMetric(dummy);
}
ret.set_taskMetric(dummy);
ret.set_streamMetric(dummy);
ret.set_nettyMetric(dummy);
return ret;
} finally {
long end = System.nanoTime();
SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / TimeUtils.NS_PER_US);
}
}
public static String getWorkerSlotName(String hostname, Integer port) {
return hostname + ":" + port;
}
class RefreshTopologiesThread extends RunnableCallback {
@Override
public void run() {
if (!isShutdown.get()) {
pushEvent(new Refresh());
}
}
@Override
public Object getResult() {
return TimeUtils.SEC_PER_MIN;
}
@Override
public String getThreadName() {
return "RefreshThread";
}
}
class MetricsUploadThread extends Thread {
public MetricsUploadThread() {
setName("main-upload-thread");
}
@Override
public void run() {
while (!isShutdown.get()) {
try {
if (metricUploader != null && nimbusData.isLeader()) {
final int idx = getFirstPendingUploadIndex();
if (idx >= 0) {
markUploading(idx);
upload(clusterName, idx);
}
}
JStormUtils.sleepMs(5);
} catch (Exception ex) {
LOG.error("Error", ex);
}
}
}
public boolean upload(final String clusterName, final int idx) {
final TopologyMetricDataInfo summary = getMetricDataInfoFromCache(idx);
if (summary == null) {
LOG.warn("metric summary is null from cache idx:{}", idx);
markUploaded(idx);
return true;
}
final String topologyId = summary.topologyId;
if (!isTopologyAlive(topologyId)) {
LOG.warn("topology {} is not alive, skip sending metrics.", topologyId);
markUploaded(idx);
return true;
}
return metricUploader.upload(clusterName, topologyId, idx, summary.toMap());
}
}
class FlushMetricMetaThread extends Thread {
public FlushMetricMetaThread() {
setName("FlushMetricMetaThread");
}
@Override
public void run() {
while (!isShutdown.get()) {
long start = System.currentTimeMillis();
try {
// if metricUploader is not fully initialized, return directly
if (nimbusData.isLeader() && metricUploader != null) {
for (Map.Entry<String, TopologyMetricContext> entry : topologyMetricContexts.entrySet()) {
String topologyId = entry.getKey();
TopologyMetricContext metricContext = entry.getValue();
Map<String, Long> cachedMeta = metricCache.getMeta(topologyId);
if (cachedMeta == null) {
cachedMeta = new HashMap<>();
}
Map<String, Long> memMeta = metricContext.getMemMeta();
if (memMeta.size() > cachedMeta.size()) {
cachedMeta.putAll(memMeta);
}
metricCache.putMeta(topologyId, cachedMeta);
int curSize = cachedMeta.size();
if (curSize != metricContext.getFlushedMetaNum()) {
metricContext.setFlushedMetaNum(curSize);
metricUploader.registerMetrics(clusterName, topologyId, cachedMeta);
LOG.info("flush metric meta, topology:{}, total:{}, cost:{}.",
topologyId, curSize, System.currentTimeMillis() - start);
}
stormClusterState.set_topology_metric(topologyId, curSize);
}
}
JStormUtils.sleepMs(15000);
} catch (Exception ex) {
LOG.error("Error", ex);
}
}
}
}
public static class TopologyMetricDataInfo implements Serializable {
private static final long serialVersionUID = 1303262512351757610L;
public String topologyId;
public String type; // "tp" for tp/comp metrics OR "task" for task/stream/worker/netty metrics
public long timestamp; // metrics report time
public Map<String, Object> toMap() {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(MetricUploader.METRIC_TIME, timestamp);
ret.put(MetricUploader.METRIC_TYPE, type);
return ret;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
// ==============================================
// =================== events ===================
// ==============================================
public static class Event {
protected Event() {
}
public String clusterName;
public String topologyId;
public long timestamp;
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
public static class Update extends Event {
public TopologyMetric topologyMetrics;
}
public static class Remove extends Event {
}
public static class Refresh extends Event {
}
public static class KillTopologyEvent extends Event {
}
public static class StartTopologyEvent extends Event {
public double sampleRate;
}
public static class TaskDeadEvent extends Event {
public Map<Integer, ResourceWorkerSlot> deadTasks;
}
public static class TaskStartEvent extends Event {
public Assignment oldAssignment;
public Assignment newAssignment;
public Map<Integer, String> task2Component;
}
}