blob: 1a5bd456dc7b769f16c4b1d0e03a77b0e34b4f74 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.metrics2;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import org.apache.storm.Config;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.metrics2.reporters.StormReporter;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class StormMetricRegistry {
private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class);
private static final MetricRegistry REGISTRY = new MetricRegistry();
private static final List<StormReporter> REPORTERS = new ArrayList<>();
private static String hostName = null;
public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port){
String metricName = metricName(name, topologyId, componentId, taskId, port);
if(REGISTRY.getGauges().containsKey(metricName)){
return (SimpleGauge)REGISTRY.getGauges().get(metricName);
} else {
return REGISTRY.register(metricName, new SimpleGauge<>(initialValue));
}
}
public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port){
return new DisruptorMetrics(
StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, taskId, port),
StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, taskId, port)
);
}
public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){
String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort());
return REGISTRY.meter(metricName);
}
public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId){
String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort());
return REGISTRY.counter(metricName);
}
public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId){
String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort);
return REGISTRY.counter(metricName);
}
public static void start(Map<String, Object> stormConfig, DaemonType type){
try {
hostName = dotToUnderScore(Utils.localHostname());
} catch (UnknownHostException e) {
LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" +
" as 'localhost'.");
}
LOG.info("Starting metrics reporters...");
List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS);
if(reporterList != null && reporterList.size() > 0) {
for (Map<String, Object> reporterConfig : reporterList) {
// only start those requested
List<String> daemons = (List<String>) reporterConfig.get("daemons");
for (String daemon : daemons) {
if (DaemonType.valueOf(daemon.toUpperCase()) == type) {
startReporter(stormConfig, reporterConfig);
}
}
}
}
}
public static MetricRegistry registry(){
return REGISTRY;
}
private static void startReporter(Map<String, Object> stormConfig, Map<String, Object> reporterConfig){
String clazz = (String)reporterConfig.get("class");
StormReporter reporter = null;
LOG.info("Attempting to instantiate reporter class: {}", clazz);
reporter = Utils.newInstance(clazz);
if(reporter != null){
reporter.prepare(REGISTRY, stormConfig, reporterConfig);
reporter.start();
REPORTERS.add(reporter);
}
}
public static void stop(){
for(StormReporter sr : REPORTERS){
sr.stop();
}
}
public static String metricName(String name, String stormId, String componentId, String streamId, Integer taskId, Integer workerPort){
StringBuilder sb = new StringBuilder("storm.worker.");
sb.append(stormId);
sb.append(".");
sb.append(hostName);
sb.append(".");
sb.append(dotToUnderScore(componentId));
sb.append(".");
sb.append(dotToUnderScore(streamId));
sb.append(".");
sb.append(taskId);
sb.append(".");
sb.append(workerPort);
sb.append("-");
sb.append(name);
return sb.toString();
}
public static String metricName(String name, String stormId, String componentId, Integer taskId, Integer workerPort) {
StringBuilder sb = new StringBuilder("storm.worker.");
sb.append(stormId);
sb.append(".");
sb.append(hostName);
sb.append(".");
sb.append(dotToUnderScore(componentId));
sb.append(".");
sb.append(taskId);
sb.append(".");
sb.append(workerPort);
sb.append("-");
sb.append(name);
return sb.toString();
}
public static String metricName(String name, TopologyContext context){
StringBuilder sb = new StringBuilder("storm.topology.");
sb.append(context.getStormId());
sb.append(".");
sb.append(hostName);
sb.append(".");
sb.append(dotToUnderScore(context.getThisComponentId()));
sb.append(".");
sb.append(context.getThisTaskId());
sb.append(".");
sb.append(context.getThisWorkerPort());
sb.append("-");
sb.append(name);
return sb.toString();
}
private static String dotToUnderScore(String str){
return str.replace('.', '_');
}
}