blob: 6531c9c45e1235dcb97e96e555ae945c0522deaf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.metric;
import backtype.storm.generated.MetricInfo;
import com.alibaba.jstorm.common.metric.*;
import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
/**
* @author Cody (weiyue.wy@alibaba-inc.com)
* @since 2.0.5
*/
public class JStormMetrics implements Serializable {
private static final long serialVersionUID = -2580242512743243267L;
public static final String NIMBUS_METRIC_KEY = "__NIMBUS__";
public static final String CLUSTER_METRIC_KEY = "__CLUSTER__";
public static final String SUPERVISOR_METRIC_KEY = "__SUPERVISOR__";
protected static final Logger LOG = LoggerFactory.getLogger(JStormMetrics.class);
/**
* Metrics in this object will be uploaded to nimbus
*/
protected static final AsmMetricRegistry workerMetrics = new AsmMetricRegistry();
protected static final AsmMetricRegistry nettyMetrics = new AsmMetricRegistry();
protected static final AsmMetricRegistry componentMetrics = new AsmMetricRegistry();
protected static final AsmMetricRegistry taskMetrics = new AsmMetricRegistry();
protected static final AsmMetricRegistry streamMetrics = new AsmMetricRegistry();
protected static final AsmMetricRegistry topologyMetrics = new AsmMetricRegistry();
protected static final AsmMetricRegistry[] allRegistries = {
streamMetrics, taskMetrics, componentMetrics, workerMetrics, nettyMetrics, topologyMetrics};
protected static String topologyId;
protected static String host;
protected static int port;
protected static boolean debug;
public static final String DEFAULT_GROUP = "sys";
public static final String NETTY_GROUP = "netty";
protected static Set<String> debugMetricNames = new HashSet<String>();
static {
host = NetWorkUtils.ip();
}
private static boolean enabled = true;
public static int getPort() {
return port;
}
public static void setPort(int port) {
JStormMetrics.port = port;
}
public static String getHost() {
return host;
}
public static void setHost(String host) {
JStormMetrics.host = host;
}
public static String getTopologyId() {
return topologyId;
}
public static void setTopologyId(String topologyId) {
JStormMetrics.topologyId = topologyId;
}
public static boolean isDebug() {
return debug;
}
public static void setDebug(boolean debug) {
JStormMetrics.debug = debug;
LOG.info("topology metrics debug enabled:{}", debug);
}
public static void setEnabled(boolean enabled) {
JStormMetrics.enabled = enabled;
}
public static boolean isEnabled() {
return enabled;
}
public static String workerMetricName(String name, MetricType type) {
return MetricUtils.workerMetricName(topologyId, host, port, name, type);
}
public static void addDebugMetrics(String names) {
String[] metrics = names.split(",");
for (String metric : metrics) {
metric = metric.trim();
if (!StringUtils.isBlank(metric)) {
debugMetricNames.add(metric);
}
}
LOG.info("debug metric names:{}", Joiner.on(",").join(debugMetricNames));
}
/**
* reserve for debug purposes
*/
public static AsmMetric find(String name) {
for (AsmMetricRegistry registry : allRegistries) {
AsmMetric metric = registry.getMetric(name);
if (metric != null) {
return metric;
}
}
return null;
}
public static AsmMetric registerStreamMetric(String name, AsmMetric metric, boolean mergeTopology) {
name = fixNameIfPossible(name);
LOG.info("register stream metric:{}", name);
AsmMetric ret = streamMetrics.register(name, metric);
if (metric.isAggregate()) {
List<AsmMetric> assocMetrics = new ArrayList<>();
String taskMetricName = MetricUtils.stream2taskName(name);
AsmMetric taskMetric = taskMetrics.register(taskMetricName, metric.clone());
assocMetrics.add(taskMetric);
String compMetricName = MetricUtils.task2compName(taskMetricName);
AsmMetric componentMetric = componentMetrics.register(compMetricName, taskMetric.clone());
assocMetrics.add(componentMetric);
String metricName = MetricUtils.getMetricName(name);
if (metricName.contains(".")){
compMetricName = MetricUtils.task2MergeCompName(taskMetricName);
AsmMetric mergeCompMetric = componentMetrics.register(compMetricName, taskMetric.clone());
assocMetrics.add(mergeCompMetric);
}
if (mergeTopology){
String topologyMetricName = MetricUtils.comp2topologyName(compMetricName);
AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone());
assocMetrics.add(topologyMetric);
}
ret.addAssocMetrics(assocMetrics.toArray(new AsmMetric[assocMetrics.size()]));
}
return ret;
}
public static AsmMetric registerTaskMetric(String name, AsmMetric metric) {
name = fixNameIfPossible(name);
AsmMetric ret = taskMetrics.register(name, metric);
if (metric.isAggregate()) {
String compMetricName = MetricUtils.task2compName(name);
AsmMetric componentMetric = componentMetrics.register(compMetricName, ret.clone());
ret.addAssocMetrics(componentMetric);
}
return ret;
}
// public static AsmMetric registerStreamTopologyMetric(String name, AsmMetric metric) {
// name = fixNameIfPossible(name);
// LOG.info("register stream metric:{}", name);
// AsmMetric ret = streamMetrics.register(name, metric);
//
// if (metric.isAggregate()) {
// String taskMetricName = MetricUtils.stream2taskName(name);
// AsmMetric taskMetric = taskMetrics.register(taskMetricName, ret.clone());
//
// String compMetricName = MetricUtils.task2compName(taskMetricName);
// AsmMetric componentMetric = componentMetrics.register(compMetricName, ret.clone());
//
// String topologyMetricName = MetricUtils.comp2topologyName(compMetricName);
// AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone());
//
// ret.addAssocMetrics(taskMetric, componentMetric, topologyMetric);
// }
//
// return ret;
// }
public static AsmMetric registerWorkerMetric(String name, AsmMetric metric) {
name = fixNameIfPossible(name);
return workerMetrics.register(name, metric);
}
public static AsmMetric registerWorkerTopologyMetric(String name, AsmMetric metric) {
name = fixNameIfPossible(name);
AsmMetric ret = workerMetrics.register(name, metric);
String topologyMetricName = MetricUtils.worker2topologyName(name);
AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone());
ret.addAssocMetrics(topologyMetric);
return ret;
}
public static AsmMetric registerNettyMetric(String name, AsmMetric metric) {
name = fixNameIfPossible(name, NETTY_GROUP);
return nettyMetrics.register(name, metric);
}
/**
* simplified helper method to register a worker histogram
*
* @param topologyId topology id
* @param name metric name, NOTE it's not a full-qualified name.
* @param histogram histogram
* @return registered histogram
*/
public static AsmHistogram registerWorkerHistogram(String topologyId, String name, AsmHistogram histogram) {
return (AsmHistogram) registerWorkerMetric(
MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.HISTOGRAM), histogram);
}
/**
* simplified helper method to register a worker gauge
*/
public static AsmGauge registerWorkerGauge(String topologyId, String name, AsmGauge gauge) {
return (AsmGauge) registerWorkerMetric(
MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.GAUGE), gauge);
}
/**
* simplified helper method to register a worker meter
*/
public static AsmMeter registerWorkerMeter(String topologyId, String name, AsmMeter meter) {
return (AsmMeter) registerWorkerMetric(
MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.METER), meter);
}
/**
* simplified helper method to register a worker counter
*/
public static AsmCounter registerWorkerCounter(String topologyId, String name, AsmCounter counter) {
return (AsmCounter) registerWorkerMetric(
MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.COUNTER), counter);
}
/**
* simplified helper method to register a worker timer
*/
public static AsmTimer registerWorkerTimer(String topologyId, String name, AsmTimer timer) {
return (AsmTimer) registerWorkerMetric(
MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.TIMER), timer);
}
public static AsmMetric getStreamMetric(String name) {
name = fixNameIfPossible(name);
return streamMetrics.getMetric(name);
}
public static AsmMetric getTaskMetric(String name) {
name = fixNameIfPossible(name);
return taskMetrics.getMetric(name);
}
public static AsmMetric getComponentMetric(String name) {
name = fixNameIfPossible(name);
return componentMetrics.getMetric(name);
}
public static AsmMetric getWorkerMetric(String name) {
name = fixNameIfPossible(name);
return workerMetrics.getMetric(name);
}
public static void unregisterWorkerMetric(String name) {
name = fixNameIfPossible(name);
workerMetrics.remove(name);
}
public static void unregisterNettyMetric(String name) {
name = fixNameIfPossible(name, NETTY_GROUP);
nettyMetrics.remove(name);
}
public static void unregisterTaskMetric(String name) {
name = fixNameIfPossible(name);
taskMetrics.remove(name);
}
public static AsmMetricRegistry getNettyMetrics() {
return nettyMetrics;
}
public static AsmMetricRegistry getWorkerMetrics() {
return workerMetrics;
}
public static AsmMetricRegistry getComponentMetrics() {
return componentMetrics;
}
public static AsmMetricRegistry getTaskMetrics() {
return taskMetrics;
}
public static AsmMetricRegistry getStreamMetrics() {
return streamMetrics;
}
/**
* convert snapshots to thrift objects, note that timestamps are aligned to min during the conversion,
* so nimbus server will get snapshots with aligned timestamps (still in ms as TDDL will use it).
*/
public static MetricInfo computeAllMetrics() {
long start = System.currentTimeMillis();
MetricInfo metricInfo = MetricUtils.mkMetricInfo();
List<Map.Entry<String, AsmMetric>> entries = Lists.newArrayList();
entries.addAll(streamMetrics.metrics.entrySet());
entries.addAll(taskMetrics.metrics.entrySet());
entries.addAll(componentMetrics.metrics.entrySet());
entries.addAll(workerMetrics.metrics.entrySet());
entries.addAll(nettyMetrics.metrics.entrySet());
entries.addAll(topologyMetrics.metrics.entrySet());
for (Map.Entry<String, AsmMetric> entry : entries) {
String name = entry.getKey();
AsmMetric metric = entry.getValue();
Map<Integer, AsmSnapshot> snapshots = metric.getSnapshots();
int op = metric.getOp();
if ((op & AsmMetric.MetricOp.LOG) == AsmMetric.MetricOp.LOG) {
MetricUtils.printMetricSnapshot(metric, snapshots);
}
if ((op & AsmMetric.MetricOp.REPORT) == AsmMetric.MetricOp.REPORT) {
MetaType metaType = MetricUtils.metaType(metric.getMetricName());
try {
if (metric instanceof AsmCounter) {
Map data = MetricUtils.toThriftCounterSnapshots(snapshots);
putIfNotEmpty(metricInfo.get_metrics(), name, data);
} else if (metric instanceof AsmGauge) {
Map data = MetricUtils.toThriftGaugeSnapshots(snapshots);
putIfNotEmpty(metricInfo.get_metrics(), name, data);
} else if (metric instanceof AsmMeter) {
Map data = MetricUtils.toThriftMeterSnapshots(snapshots);
putIfNotEmpty(metricInfo.get_metrics(), name, data);
} else if (metric instanceof AsmHistogram) {
Map data = MetricUtils.toThriftHistoSnapshots(metaType, snapshots);
putIfNotEmpty(metricInfo.get_metrics(), name, data);
} else if (metric instanceof AsmTimer) {
Map data = MetricUtils.toThriftTimerSnapshots(metaType, snapshots);
putIfNotEmpty(metricInfo.get_metrics(), name, data);
}
} catch (Exception ex) {
LOG.error("Error", ex);
}
}
}
if (debug) {
MetricUtils.printMetricInfo(metricInfo, debugMetricNames);
}
LOG.info("compute all metrics, cost:{}", System.currentTimeMillis() - start);
return metricInfo;
}
@SuppressWarnings("unchecked")
public static <T extends Map> void putIfNotEmpty(Map base, String name, T data) {
if (data != null && data.size() > 0) {
base.put(name, data);
}
}
public static String fixNameIfPossible(String name) {
return fixNameIfPossible(name, DEFAULT_GROUP);
}
public static String fixNameIfPossible(String name, String group) {
MetaType type = MetricUtils.metaType(name);
String[] parts = name.split(MetricUtils.DELIM);
if (parts[1].equals("")) {
parts[1] = topologyId;
}
if (type != MetaType.WORKER && parts[5].equals("")) {
parts[5] = group;
} else if (parts[2].equals("")) {
parts[2] = host;
parts[3] = port + "";
if (parts[4].equals("")) {
parts[4] = group;
}
}
return MetricUtils.concat(parts);
}
public static void main(String[] args) throws Exception {
JStormMetrics.topologyId = "topologyId";
JStormMetrics.host = "127.0.0.1";
JStormMetrics.port = 6800;
String tpId = "test";
String compName = "bolt";
int taskId = 1;
String streamId = "defaultStream";
String type = MetaType.STREAM.getV() + MetricType.COUNTER.getV();
String metricName = "counter1";
String group = "udf";
String name = MetricUtils.metricName(type, tpId, compName, taskId, streamId, group, metricName);
System.out.println(name);
AsmCounter counter = new AsmCounter();
AsmMetric ret1 = JStormMetrics.registerStreamMetric(name, counter, false);
AsmMetric ret2 = JStormMetrics.registerStreamMetric(name, counter, false);
System.out.println(ret1 == ret2);
counter.update(1L);
metricName = MetricUtils.workerMetricName("metric1", MetricType.COUNTER);
System.out.println(metricName);
metricName = fixNameIfPossible(metricName);
System.out.println(metricName);
System.out.println(fixNameIfPossible(metricName));
}
}