blob: 67fa998915675ec26081899ed87354bfbe8a6142 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.metrics;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import org.apache.falcon.FalconException;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.StartupProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* Service for metrics notification.
*/
public class MetricNotificationService implements FalconService {
private static final Logger LOG = LoggerFactory.getLogger(MetricNotificationService.class);
public static final String SERVICE_NAME = MetricNotificationService.class.getSimpleName();
private static final MetricNotificationService METRIC_NOTIFICATION_SERVICE = new MetricNotificationService();
private final GraphiteReporter graphiteReporter;
private final MetricRegistry metricRegistry;
private Map<String, MetricGauge> metricMap = new ConcurrentHashMap<>();
public static MetricNotificationService get(){
return METRIC_NOTIFICATION_SERVICE;
}
public MetricNotificationService(){
Graphite graphite = new Graphite(new InetSocketAddress(StartupProperties
.get().getProperty("falcon.graphite.hostname"), Integer.parseInt(StartupProperties.get()
.getProperty("falcon.graphite.port"))));
metricRegistry=new MetricRegistry();
this.graphiteReporter = GraphiteReporter.forRegistry(metricRegistry)
.convertDurationsTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL)
.build(graphite);
}
@Override
public String getName() {
return SERVICE_NAME;
}
@Override
public void init() throws FalconException {
LOG.info("Starting Graphite Service");
graphiteReporter.start(Long.parseLong(StartupProperties.get().getProperty("falcon.graphite.frequency")),
TimeUnit.SECONDS);
}
@Override
public void destroy() throws FalconException {
try {
// reporting final metrics into graphite before stopping
graphiteReporter.report();
} finally {
graphiteReporter.stop();
}
}
private MetricGauge createMetric(final String metricName){
if (!metricMap.containsKey(metricName)) {
MetricGauge metricGauge = new MetricGauge();
metricMap.put(metricName, metricGauge);
metricRegistry.register(metricName, metricGauge);
}
return metricMap.get(metricName);
}
public void publish(String metricsName, Long value){
synchronized(this){
createMetric(metricsName).setValue(value);
}
}
public void deleteMetric(String metricName){
synchronized (this){
SortedMap<String, Gauge> gaugeMap = metricRegistry.getGauges();
if (gaugeMap.get(metricName) != null){
metricRegistry.remove(metricName);
metricMap.remove(metricName);
}
}
}
private static class MetricGauge implements Gauge<Long> {
private Long value=0L;
public void setValue(Long value){
this.value=value;
}
@Override
public Long getValue() {
return value;
}
}
}