blob: 70c894f4a14bdf73d6bdf26cd7713256ffbd4555 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.oozie.util;
import com.codahale.metrics.Counter;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ganglia.GangliaReporter;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.codahale.metrics.json.MetricsModule;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.ganglia.gmetric4j.gmetric.GMetric;
import org.apache.oozie.service.ConfigurationService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
* Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics. This class
* was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping
* the same API. However, certain operations are obviously implemented differently or are no longer needed; and the output format
* is a little different. Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge},
* counter to {@link Counter}, and Sampler to {@link Histogram}.
public class MetricsInstrumentation extends Instrumentation {
private final MetricRegistry metricRegistry;
private transient ObjectMapper jsonMapper;
private ScheduledExecutorService scheduler;
private final LoadingCache<String, Counter> counters;
private final Map<String, Gauge> gauges;
private final LoadingCache<String, com.codahale.metrics.Timer> timers;
private final Map<String, Histogram> histograms;
private Lock timersLock;
private Lock gaugesLock;
private Lock countersLock;
private Lock histogramsLock;
public static final String EXTERNAL_MONITORING_ENABLE = "oozie.external_monitoring.enable";
public static final String EXTERNAL_MONITORING_TYPE = "oozie.external_monitoring.type";
public static final String EXTERNAL_MONITORING_ADDRESS = "oozie.external_monitoring.address";
public static final String EXTERNAL_MONITORING_PREFIX = "oozie.external_monitoring.metricPrefix";
public static final String EXTERNAL_MONITORING_INTERVAL = "oozie.external_monitoring.reporterIntervalSecs";
public static final String GRAPHITE="graphite";
public static final String GANGLIA="ganglia";
private String metricsAddress;
private String metricsHost;
private String metricsPrefix;
private String metricsServerName;
private int metricsPort;
private GraphiteReporter graphiteReporter = null;
private GangliaReporter gangliaReporter = null;
private long metricsReportIntervalSec;
private boolean isExternalMonitoringEnabled;
private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS;
private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;
protected XLog LOG = XLog.getLog(getClass());
* Creates the MetricsInstrumentation and starts taking some metrics.
public MetricsInstrumentation() {
metricRegistry = new MetricRegistry();
isExternalMonitoringEnabled = ConfigurationService.getBoolean(EXTERNAL_MONITORING_ENABLE);
if(isExternalMonitoringEnabled) {
metricsServerName = ConfigurationService.get(EXTERNAL_MONITORING_TYPE);
if (metricsServerName != null) {
String modifiedServerName = metricsServerName.trim().toLowerCase();
if (modifiedServerName.equals(GRAPHITE) || modifiedServerName.equals(GANGLIA)) {
metricsAddress = ConfigurationService.get(EXTERNAL_MONITORING_ADDRESS);
metricsPrefix = ConfigurationService.get(EXTERNAL_MONITORING_PREFIX);
metricsReportIntervalSec = ConfigurationService.getLong(EXTERNAL_MONITORING_INTERVAL);
LOG.debug("Publishing external monitoring to [{0}] at host [{1}] every [{2}] seconds with prefix " +
"[{3}]", metricsServerName, metricsAddress, metricsReportIntervalSec, metricsPrefix);
try {
URL url = new URL(metricsAddress);
metricsHost = url.getHost();
metricsPort = url.getPort();
} catch (MalformedURLException e) {
LOG.error("Exception, ", e);
if (modifiedServerName.equals(GRAPHITE)) {
Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, metricsPort));
graphiteReporter = GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix)
graphiteReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS);
if (modifiedServerName.equals(GANGLIA)) {
GMetric ganglia;
try {
ganglia = new GMetric(metricsHost, metricsPort, GMetric.UDPAddressingMode.MULTICAST, 1);
} catch (IOException e) {
LOG.error("Exception, ", e);
throw new RuntimeException(e);
gangliaReporter = GangliaReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix)
gangliaReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS);
} else {
throw new RuntimeException("Metrics Server Name should be either graphite or ganglia");
else {
throw new RuntimeException("Metrics Server Name is not specified");
timersLock = new ReentrantLock();
gaugesLock = new ReentrantLock();
countersLock = new ReentrantLock();
histogramsLock = new ReentrantLock();
// Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet)
// The "false" is to prevent it from printing out all of the values used in the histograms and timers
this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false));
// Register the JVM memory gauges and prefix the keys
MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet();
for (String key : memorySet.getMetrics().keySet()) {
metricRegistry.register("jvm", "memory", key), memorySet.getMetrics().get(key));
// By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created
counters = CacheBuilder.newBuilder().build(
new CacheLoader<String, Counter>() {
public Counter load(String key) throws Exception {
Counter counter = new Counter();
metricRegistry.register(key, counter);
return counter;
timers = CacheBuilder.newBuilder().build(
new CacheLoader<String, com.codahale.metrics.Timer>() {
public com.codahale.metrics.Timer load(String key) throws Exception {
com.codahale.metrics.Timer timer
= new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir());
metricRegistry.register(key, timer);
return timer;
gauges = new ConcurrentHashMap<String, Gauge>();
histograms = new ConcurrentHashMap<String, Histogram>();
* Reporting final metrics into the server before stopping
public void stop() {
if (graphiteReporter != null) {
try {
// reporting final metrics into graphite before stopping;
} finally {
if (gangliaReporter != null) {
try {
// reporting final metrics into ganglia before stopping;
} finally {
* Add a cron to an instrumentation timer. The timer is created if it does not exists. <p>
* Internally, this is backed by a {@link com.codahale.metrics.Timer}.
* @param group timer group.
* @param name timer name.
* @param cron cron to add to the timer.
public void addCron(String group, String name, Cron cron) {
String key =, name, "timer");
try {
com.codahale.metrics.Timer timer = timers.get(key);
timer.update(cron.getOwn(), TimeUnit.MILLISECONDS);
} catch(ExecutionException ee) {
throw new RuntimeException(ee);
} finally {
* Add an instrumentation variable. <p>
* Internally, this is backed by a {@link Gauge}.
* @param group counter group.
* @param name counter name.
* @param variable variable to add.
public void addVariable(String group, String name, final Variable variable) {
Gauge gauge = new Gauge() {
public Object getValue() {
return variable.getValue();
String key =, name);
try {
gauges.put(key, gauge);
// Metrics throws an Exception if we don't do this when the key already exists
if (metricRegistry.getGauges().containsKey(key)) {
XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. "
+ " The old Variable will be overwritten, but this is not recommended");
metricRegistry.register(key, gauge);
} finally {
* Increment an instrumentation counter. The counter is created if it does not exists. <p>
* Internally, this is backed by a {@link Counter}.
* @param group counter group.
* @param name counter name.
* @param count increment to add to the counter.
public void incr(String group, String name, long count) {
String key =, name);
try {
} catch(ExecutionException ee) {
throw new RuntimeException(ee);
} finally {
* Add a sampling variable. <p>
* Internally, this is backed by a biased (decaying) {@link Histogram}.
* @param group timer group.
* @param name timer name.
* @param period (ignored)
* @param interval sampling frequency, how often the variable is probed.
* @param variable variable to sample.
public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
if (scheduler == null) {
throw new IllegalStateException("scheduler not set, cannot sample");
Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir());
Sampler sampler = new Sampler(variable, histogram);
scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS);
String key =, name, "histogram");
try {
histograms.put(key, histogram);
// Metrics throws an Exception if we don't do this when the key already exists
if (metricRegistry.getHistograms().containsKey(key)) {
XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. "
+ " The old Sampler will be overwritten, but this is not recommended");
metricRegistry.register(key, histogram);
} finally {
public static class Sampler implements Runnable {
private final Variable<Long> variable;
private final Histogram histogram;
public Sampler(Variable<Long> variable, Histogram histogram) {
this.variable = variable;
this.histogram = histogram;
public void run() {
* Set the scheduler instance to handle the samplers.
* @param scheduler scheduler instance.
public void setScheduler(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
* Return the string representation of the instrumentation. It does a JSON pretty-print.
* @return the string representation of the instrumentation.
public String toString() {
try {
return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
} catch (JsonProcessingException jpe) {
throw new RuntimeException(jpe);
* Converts the current state of the metrics and writes them to the OutputStream.
* @param os The OutputStream to write the metrics to
* @throws IOException
public void writeJSONResponse(OutputStream os) throws IOException {
jsonMapper.writer().writeValue(os, metricRegistry);
* Returns the MetricRegistry: public for unit tests -- do not use.
* @return the MetricRegistry
MetricRegistry getMetricRegistry() {
return metricRegistry;
* Not Supported: throws {@link UnsupportedOperationException}
* @return nothing
public Map<String, Map<String, Map<String, Object>>> getAll() {
throw new UnsupportedOperationException();
* Not Supported: throws {@link UnsupportedOperationException}
* @return nothing
public Map<String, Map<String, Element<Long>>> getCounters() {
throw new UnsupportedOperationException();
* Not Supported: throws {@link UnsupportedOperationException}
* @return nothing
public Map<String, Map<String, Element<Double>>> getSamplers() {
throw new UnsupportedOperationException();
* Not Supported: throws {@link UnsupportedOperationException}
* @return nothing
public Map<String, Map<String, Element<Timer>>> getTimers() {
throw new UnsupportedOperationException();
* Not Supported: throws {@link UnsupportedOperationException}
* @return nothing
public Map<String, Map<String, Element<Variable>>> getVariables() {
throw new UnsupportedOperationException();