blob: 5039c87df9cfbefb41d0780a1fbd58a1131d9751 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.metric;
import backtype.storm.Config;
import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.TopologyMetric;
import backtype.storm.generated.WorkerUploadMetrics;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Values;
import backtype.storm.utils.NimbusClient;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.Update;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* report metrics from worker to nimbus server. this class serves as an object in Worker/Nimbus/Supervisor.
* when in Worker, it reports data via netty transport; otherwise reports via thrift.
* <p/>
* there are 2 threads:
* 1.flush thread: check every 1 sec, when current time is aligned to 1 min, flush all metrics to snapshots
* 2.check meta thread: use thrift to get metric id from nimbus server.
*
* @author Cody (weiyue.wy@alibaba-inc.com)
* @since 2.0.5
*/
public class JStormMetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(JStormMetricsReporter.class);
private Map conf;
protected String clusterName;
protected String topologyId;
protected String host;
protected int port;
protected boolean localMode = false;
private AsyncLoopThread checkMetricMetaThread;
protected final int checkMetaThreadCycle;
private AsyncLoopThread flushMetricThread;
protected final int flushMetricThreadCycle;
private boolean test = false;
private boolean isInWorker = false;
private SpoutOutputCollector spoutOutput;
private OutputCollector boltOutput;
private boolean enableMetrics;
private NimbusClient client = null;
public JStormMetricsReporter(Object role) {
LOG.info("starting jstorm metrics reporter");
if (role instanceof WorkerData) {
WorkerData workerData = (WorkerData) role;
this.conf = workerData.getStormConf();
this.topologyId = (String) conf.get(Config.TOPOLOGY_ID);
this.port = workerData.getPort();
this.isInWorker = true;
} else if (role instanceof NimbusData) {
NimbusData nimbusData = (NimbusData) role;
this.conf = nimbusData.getConf();
this.topologyId = JStormMetrics.NIMBUS_METRIC_KEY;
}
this.host = JStormMetrics.getHost();
this.enableMetrics = JStormMetrics.isEnabled();
if (!enableMetrics) {
LOG.warn("***** topology metrics is disabled! *****");
} else {
LOG.info("topology metrics is enabled.");
}
this.checkMetaThreadCycle = 30;
// flush metric snapshots when time is aligned, check every sec.
this.flushMetricThreadCycle = 1;
LOG.info("check meta thread freq:{}, flush metrics thread freq:{}", checkMetaThreadCycle, flushMetricThreadCycle);
this.localMode = StormConfig.local_mode(conf);
this.clusterName = ConfigExtension.getClusterName(conf);
LOG.info("done.");
}
@VisibleForTesting
JStormMetricsReporter() {
LOG.info("Successfully started jstorm metrics reporter for test.");
this.test = true;
this.flushMetricThreadCycle = 1;
this.checkMetaThreadCycle = 30;
}
public void init() {
if (!localMode && enableMetrics) {
this.checkMetricMetaThread = new AsyncLoopThread(new CheckMetricMetaThread());
this.flushMetricThread = new AsyncLoopThread(new FlushMetricThread());
}
}
private Map<String, Long> registerMetrics(Set<String> names) {
if (test || !enableMetrics) {
return new HashMap<>();
}
try {
if (client == null) {
client = NimbusClient.getConfiguredClient(conf);
}
return client.getClient().registerMetrics(topologyId, names);
} catch (Exception e) {
LOG.error("Failed to gen metric ids", e);
if (client != null) {
client.close();
client = NimbusClient.getConfiguredClient(conf);
}
}
return null;
}
public void shutdown() {
if (!localMode && enableMetrics) {
this.checkMetricMetaThread.cleanup();
this.flushMetricThread.cleanup();
}
}
public void doUpload() {
if (test) {
return;
}
try {
long start = System.currentTimeMillis();
MetricInfo workerMetricInfo = JStormMetrics.computeAllMetrics();
WorkerUploadMetrics upload = new WorkerUploadMetrics();
upload.set_topologyId(topologyId);
upload.set_supervisorId(host);
upload.set_port(port);
upload.set_allMetrics(workerMetricInfo);
if (workerMetricInfo.get_metrics_size() > 0) {
uploadMetric(upload);
LOG.info("Successfully upload worker metrics, size:{}, cost:{}",
workerMetricInfo.get_metrics_size(), System.currentTimeMillis() - start);
} else {
LOG.info("No metrics to upload.");
}
} catch (Exception e) {
LOG.error("Failed to upload worker metrics", e);
}
}
public void uploadMetric(WorkerUploadMetrics metrics) {
if (isInWorker) {
//in Worker, we upload data via netty transport
if (boltOutput != null) {
LOG.info("emit metrics through bolt collector.");
boltOutput.emit(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID,
new Values(JStormServerUtils.getName(host, port), metrics));
} else if (spoutOutput != null) {
LOG.info("emit metrics through spout collector.");
spoutOutput.emit(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID,
new Values(JStormServerUtils.getName(host, port), metrics));
}
}else {
// in supervisor or nimbus, we upload metric data via thrift
LOG.info("emit metrics through nimbus client.");
Update event = new Update();
TopologyMetric tpMetric = MetricUtils.mkTopologyMetric();
tpMetric.set_workerMetric(metrics.get_allMetrics());
event.topologyMetrics = tpMetric;
event.topologyId = topologyId;
try {
if (client == null) {
client = NimbusClient.getConfiguredClient(conf);
}
client.getClient().uploadTopologyMetrics(topologyId, tpMetric);
} catch (Exception ex) {
LOG.error("upload metric error:", ex);
if (client != null) {
client.close();
client = NimbusClient.getConfiguredClient(conf);
}
}
}
//MetricUtils.logMetrics(metrics.get_allMetrics());
}
public void setOutputCollector(Object outputCollector) {
if (outputCollector instanceof OutputCollector) {
this.boltOutput = (OutputCollector) outputCollector;
} else if (outputCollector instanceof SpoutOutputCollector) {
this.spoutOutput = (SpoutOutputCollector) outputCollector;
}
}
class FlushMetricThread extends RunnableCallback {
@Override
public void run() {
if (TimeUtils.isTimeAligned()) {
int cnt = 0;
try {
for (AsmMetricRegistry registry : JStormMetrics.allRegistries) {
for (Map.Entry<String, AsmMetric> entry : registry.getMetrics().entrySet()) {
entry.getValue().flush();
cnt++;
}
}
LOG.info("flush metrics, total:{}.", cnt);
doUpload();
} catch (Exception ex) {
LOG.error("Error", ex);
}
}
}
@Override
public Object getResult() {
return flushMetricThreadCycle;
}
}
class CheckMetricMetaThread extends RunnableCallback {
private volatile boolean processing = false;
private final long start = TimeUtils.current_time_secs();
private final long initialDelay = 30 + new Random().nextInt(15);
@Override
public void run() {
if (TimeUtils.current_time_secs() - start < initialDelay) {
return;
}
if (processing) {
LOG.info("still processing, skip...");
} else {
processing = true;
long start = System.currentTimeMillis();
try {
Set<String> names = new HashSet<>();
for (AsmMetricRegistry registry : JStormMetrics.allRegistries) {
Map<String, AsmMetric> metricMap = registry.getMetrics();
for (Map.Entry<String, AsmMetric> metricEntry : metricMap.entrySet()) {
AsmMetric metric = metricEntry.getValue();
if (((metric.getOp() & AsmMetric.MetricOp.REPORT) == AsmMetric.MetricOp.REPORT) &&
metric.getMetricId() == 0L) {
names.add(metricEntry.getKey());
}
}
}
if (names.size() > 0) {
Map<String, Long> nameIdMap = registerMetrics(names);
if (nameIdMap != null) {
for (String name : nameIdMap.keySet()) {
AsmMetric metric = JStormMetrics.find(name);
if (metric != null) {
long id = nameIdMap.get(name);
metric.setMetricId(id);
LOG.info("set metric id, {}:{}", name, id);
}
}
}
LOG.info("register metrics, size:{}, cost:{}", names.size(), System.currentTimeMillis() - start);
}
} catch (Exception ex) {
LOG.error("Error", ex);
}
processing = false;
}
}
@Override
public Object getResult() {
return checkMetaThreadCycle;
}
}
}