blob: cf529eaca912f06fc00ff001eefe4f5c5669eddc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink.kafka;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Metered;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricProcessor;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Summarizable;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.stats.Snapshot;
import kafka.metrics.KafkaMetricsConfig;
import kafka.metrics.KafkaMetricsReporter;
import kafka.utils.VerifiableProperties;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType;
import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
implements KafkaMetricsReporter, KafkaTimelineMetricsReporterMBean {
private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class);
private static final String TIMELINE_METRICS_KAFKA_PREFIX = "kafka.timeline.metrics.";
private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "sendInterval";
private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "maxRowCacheSize";
private static final String TIMELINE_HOSTS_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "hosts";
private static final String TIMELINE_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "port";
private static final String TIMELINE_PROTOCOL_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "protocol";
private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + "reporter.enabled";
private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY;
private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY;
private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY;
private static final String TIMELINE_DEFAULT_HOST = "localhost";
private static final String TIMELINE_DEFAULT_PORT = "6188";
private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix";
private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix";
private static final String INCLUDED_METRICS_REGEX_PROPERTY = "external.kafka.metrics.include.regex";
private volatile boolean initialized = false;
private boolean running = false;
private final Object lock = new Object();
private String hostname;
private String metricCollectorPort;
private Collection<String> collectorHosts;
private String metricCollectorProtocol;
private TimelineScheduledReporter reporter;
private TimelineMetricsCache metricsCache;
private int timeoutSeconds = 10;
private String zookeeperQuorum = null;
private boolean setInstanceId;
private String instanceId;
private String[] excludedMetricsPrefixes;
private String[] includedMetricsPrefixes;
private String[] includedMetricsRegex;
// Local cache to avoid prefix matching everytime
private Set<String> excludedMetrics = new HashSet<>();
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
private String hostInMemoryAggregationProtocol;
@Override
protected String getCollectorUri(String host) {
return constructTimelineMetricUri(metricCollectorProtocol, host, metricCollectorPort);
}
@Override
protected String getCollectorProtocol() {
return metricCollectorProtocol;
}
@Override
protected String getCollectorPort() {
return metricCollectorPort;
}
@Override
protected int getTimeoutSeconds() {
return timeoutSeconds;
}
@Override
protected String getZookeeperQuorum() {
return zookeeperQuorum;
}
@Override
protected Collection<String> getConfiguredCollectorHosts() {
return collectorHosts;
}
@Override
protected String getHostname() {
return hostname;
}
@Override
protected boolean isHostInMemoryAggregationEnabled() {
return hostInMemoryAggregationEnabled;
}
@Override
protected int getHostInMemoryAggregationPort() {
return hostInMemoryAggregationPort;
}
@Override
protected String getHostInMemoryAggregationProtocol() {
return hostInMemoryAggregationProtocol;
}
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
@Override
public void init(VerifiableProperties props) {
synchronized (lock) {
if (!initialized) {
LOG.info("Initializing Kafka Timeline Metrics Sink");
try {
hostname = InetAddress.getLocalHost().getHostName();
//If not FQDN , call DNS
if ((hostname == null) || (!hostname.contains("."))) {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
}
} catch (UnknownHostException e) {
LOG.error("Could not identify hostname.");
throw new RuntimeException("Could not identify hostname.", e);
}
// Initialize the collector write strategy
super.init();
KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT);
zookeeperQuorum = props.containsKey(COLLECTOR_ZOOKEEPER_QUORUM) ?
props.getString(COLLECTOR_ZOOKEEPER_QUORUM) : props.getString("zookeeper.connect");
metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST));
metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY, null);
setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY, false);
hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
hostInMemoryAggregationProtocol = props.getString(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
if (metricCollectorProtocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
// Exclusion policy
String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, "");
if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {
excludedMetricsPrefixes = excludedMetricsStr.trim().split(",");
}
// Inclusion override
String includedMetricsStr = props.getString(INCLUDED_METRICS_PROPERTY, "");
if (!StringUtils.isEmpty(includedMetricsStr.trim())) {
includedMetricsPrefixes = includedMetricsStr.trim().split(",");
}
// Inclusion override
String includedMetricsRegexStr = props.getString(INCLUDED_METRICS_REGEX_PROPERTY, "");
if (!StringUtils.isEmpty(includedMetricsRegexStr.trim())) {
LOG.info("Including metrics which match the following regex patterns : " + includedMetricsRegexStr);
includedMetricsRegex = includedMetricsRegexStr.trim().split(",");
}
initializeReporter();
if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
startReporter(metricsConfig.pollingIntervalSecs());
}
if (LOG.isDebugEnabled()) {
LOG.debug("MetricsSendInterval = " + metricsSendInterval);
LOG.debug("MaxRowCacheSize = " + maxRowCacheSize);
LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr);
LOG.debug("Included metrics prefixes = " + includedMetricsStr);
}
}
}
}
public String getMBeanName() {
return "kafka:type=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter";
}
public synchronized void startReporter(long period) {
synchronized (lock) {
if (initialized && !running) {
reporter.start(period, TimeUnit.SECONDS);
running = true;
LOG.info(String.format("Started Kafka Timeline metrics reporter with polling period %d seconds", period));
}
}
}
public synchronized void stopReporter() {
synchronized (lock) {
if (initialized && running) {
reporter.stop();
running = false;
LOG.info("Stopped Kafka Timeline metrics reporter");
initializeReporter();
}
}
}
private void initializeReporter() {
reporter = new TimelineScheduledReporter(Metrics.defaultRegistry(), "timeline-scheduled-reporter",
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
initialized = true;
}
interface Context {
public List<TimelineMetric> getTimelineMetricList();
}
protected boolean isExcludedMetric(String metricName) {
if (excludedMetrics.contains(metricName)) {
return true;
}
if (LOG.isTraceEnabled()) {
LOG.trace("metricName => " + metricName +
", exclude: " + StringUtils.startsWithAny(metricName, excludedMetricsPrefixes) +
", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes));
}
if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) {
if (!(StringUtils.startsWithAny(metricName, includedMetricsPrefixes) || Arrays.stream(includedMetricsRegex).anyMatch(metricName::matches))) {
excludedMetrics.add(metricName);
return true;
}
}
return false;
}
class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> {
private static final String APP_ID = "kafka_broker";
private static final String COUNT_SUFIX = ".count";
private static final String ONE_MINUTE_RATE_SUFIX = ".1MinuteRate";
private static final String MEAN_SUFIX = ".mean";
private static final String MEAN_RATE_SUFIX = ".meanRate";
private static final String FIVE_MINUTE_RATE_SUFIX = ".5MinuteRate";
private static final String FIFTEEN_MINUTE_RATE_SUFIX = ".15MinuteRate";
private static final String MIN_SUFIX = ".min";
private static final String MAX_SUFIX = ".max";
private static final String MEDIAN_SUFIX = ".median";
private static final String STD_DEV_SUFIX = "stddev";
private static final String SEVENTY_FIFTH_PERCENTILE_SUFIX = ".75percentile";
private static final String NINETY_FIFTH_PERCENTILE_SUFIX = ".95percentile";
private static final String NINETY_EIGHTH_PERCENTILE_SUFIX = ".98percentile";
private static final String NINETY_NINTH_PERCENTILE_SUFIX = ".99percentile";
private static final String NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX = ".999percentile";
protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
super(registry, name, rateUnit, durationUnit);
}
@Override
public void report(Set<Entry<MetricName, Metric>> metrics) {
final List<TimelineMetric> metricsList = new ArrayList<TimelineMetric>();
try {
for (Entry<MetricName, Metric> entry : metrics) {
final MetricName metricName = entry.getKey();
final Metric metric = entry.getValue();
Context context = new Context() {
public List<TimelineMetric> getTimelineMetricList() {
return metricsList;
}
};
metric.processWith(this, metricName, context);
}
} catch (Throwable t) {
LOG.error("Exception processing Kafka metric", t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Metrics List size: " + metricsList.size());
LOG.debug("Metics Set size: " + metrics.size());
LOG.debug("Excluded metrics set: " + excludedMetrics);
}
if (!metricsList.isEmpty()) {
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricsList);
try {
emitMetrics(timelineMetrics);
} catch (Throwable t) {
LOG.error("Exception emitting metrics", t);
}
}
}
private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
Number attributeValue) {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating timeline metric: " + attributeName + " = " + attributeValue + " time = "
+ currentTimeMillis + " app_id = " + component);
}
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
if (setInstanceId) {
timelineMetric.setInstanceId(instanceId);
}
timelineMetric.setAppId(component);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
return timelineMetric;
}
@Override
public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
final long currentTimeMillis = System.currentTimeMillis();
final String sanitizedName = sanitizeName(name);
String[] metricNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, meter);
populateMetricsList(context, MetricType.GAUGE, metricNames);
}
@Override
public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
final long currentTimeMillis = System.currentTimeMillis();
final String sanitizedName = sanitizeName(name);
final String metricCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
COUNT_SUFIX, counter.count());
populateMetricsList(context, MetricType.COUNTER, metricCountName);
}
@Override
public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {
final long currentTimeMillis = System.currentTimeMillis();
final Snapshot snapshot = histogram.getSnapshot();
final String sanitizedName = sanitizeName(name);
String[] metricHNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, histogram);
String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot);
String[] metricNames = (String[]) ArrayUtils.addAll(metricHNames, metricSNames);
populateMetricsList(context, MetricType.GAUGE, metricNames);
}
@Override
public void processTimer(MetricName name, Timer timer, Context context) throws Exception {
final long currentTimeMillis = System.currentTimeMillis();
final Snapshot snapshot = timer.getSnapshot();
final String sanitizedName = sanitizeName(name);
String[] metricMNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, timer);
String[] metricTNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, timer);
String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot);
String[] metricNames = (String[]) ArrayUtils.addAll(metricMNames, metricTNames);
metricNames = (String[]) ArrayUtils.addAll(metricNames, metricSNames);
populateMetricsList(context, MetricType.GAUGE, metricNames);
}
@Override
public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
final long currentTimeMillis = System.currentTimeMillis();
final String sanitizedName = sanitizeName(name);
try {
if (!isExcludedMetric(sanitizedName)) {
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
populateMetricsList(context, MetricType.GAUGE, sanitizedName);
}
} catch (NumberFormatException ex) {
LOG.debug(ex.getMessage());
}
}
private String[] cacheKafkaMetered(long currentTimeMillis, String sanitizedName, Metered meter) {
final String meterCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
COUNT_SUFIX, meter.count());
final String meterOneMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
ONE_MINUTE_RATE_SUFIX, meter.oneMinuteRate());
final String meterMeanRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
MEAN_RATE_SUFIX, meter.meanRate());
final String meterFiveMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
FIVE_MINUTE_RATE_SUFIX, meter.fiveMinuteRate());
final String meterFifteenMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
FIFTEEN_MINUTE_RATE_SUFIX, meter.fifteenMinuteRate());
return new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName,
meterFiveMinuteRateName, meterFifteenMinuteRateName };
}
private String[] cacheKafkaSummarizable(long currentTimeMillis, String sanitizedName, Summarizable summarizable) {
final String minName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
MIN_SUFIX, summarizable.min());
final String maxName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
MAX_SUFIX, summarizable.max());
final String meanName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
MEAN_SUFIX, summarizable.mean());
final String stdDevName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
STD_DEV_SUFIX, summarizable.stdDev());
return new String[] { maxName, meanName, minName, stdDevName };
}
private String[] cacheKafkaSnapshot(long currentTimeMillis, String sanitizedName, Snapshot snapshot) {
final String medianName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
MEDIAN_SUFIX, snapshot.getMedian());
final String seventyFifthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
SEVENTY_FIFTH_PERCENTILE_SUFIX, snapshot.get75thPercentile());
final String ninetyFifthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
NINETY_FIFTH_PERCENTILE_SUFIX, snapshot.get95thPercentile());
final String ninetyEighthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
NINETY_EIGHTH_PERCENTILE_SUFIX, snapshot.get98thPercentile());
final String ninetyNinthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile());
final String ninetyNinePointNinePercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile());
return new String[] { medianName,
ninetyEighthPercentileName, ninetyFifthPercentileName, ninetyNinePointNinePercentileName,
ninetyNinthPercentileName, seventyFifthPercentileName };
}
private String cacheSanitizedTimelineMetric(long currentTimeMillis, String sanitizedName, String suffix, Number metricValue) {
final String meterName = sanitizedName + suffix;
final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, meterName, metricValue);
// Skip cache if we decide not to include the metric
// Cannot do this before calculations of percentiles
if (!isExcludedMetric(meterName)) {
metricsCache.putTimelineMetric(metric);
}
return meterName;
}
private void populateMetricsList(Context context, MetricType type, String... metricNames) {
for (String metricName : metricNames) {
TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);
if (cachedMetric != null) {
cachedMetric.setType(type.name());
context.getTimelineMetricList().add(cachedMetric);
}
}
}
protected String sanitizeName(MetricName name) {
if (name == null) {
return "";
}
final String qualifiedTypeName = name.getGroup() + "." + name.getType() + "." + name.getName();
final String metricName = name.hasScope() ? qualifiedTypeName + '.' + name.getScope() : qualifiedTypeName;
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < metricName.length(); i++) {
final char p = metricName.charAt(i);
if (!(p >= 'A' && p <= 'Z') && !(p >= 'a' && p <= 'z') && !(p >= '0' && p <= '9') && (p != '_') && (p != '-')
&& (p != '.') && (p != '\0')) {
sb.append('_');
} else {
sb.append(p);
}
}
return sb.toString();
}
}
}