blob: 40dbd7cc4fb47d60630df71735981addfb1b3abb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.commons.metrics.rrd4j.impl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Timer;
import org.rrd4j.core.Archive;
import org.rrd4j.core.RrdDb;
import org.rrd4j.core.RrdDef;
import org.rrd4j.core.Sample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import static java.lang.String.join;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
class RRD4JReporter extends ScheduledReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(RRD4JReporter.class);
private static final String PROPERTIES_SUFFIX = ".properties";
static final int DEFAULT_STEP = 5;
static final String DEFAULT_PATH = "metrics/metrics.rrd";
private final Map<String, Integer> dictionary = new HashMap<>();
private final RrdDb rrdDB;
private long lastSampleTime;
static Builder forRegistry(MetricRegistry metricRegistry) {
return new Builder(metricRegistry);
}
static class Builder {
private MetricRegistry metricRegistry;
private TimeUnit ratesUnit = TimeUnit.SECONDS;
private TimeUnit durationUnit = TimeUnit.MICROSECONDS;
private File path = new File(".");
private final List<String> indexedDS = new ArrayList<>();
private final Map<String, Integer> dictionary = new HashMap<>();
private final List<String> archives = new ArrayList<>();
private int step = DEFAULT_STEP;
Builder(MetricRegistry metricRegistry ) {
this.metricRegistry = metricRegistry;
}
Builder withPath(File path) {
if (path == null) {
LOGGER.warn("Illegal path value, will use default({}).", DEFAULT_PATH);
path = new File(DEFAULT_PATH);
}
this.path = path;
return this;
}
Builder withDatasources(String[] datasources) {
if (datasources == null) {
datasources = new String[0];
}
this.indexedDS.clear();
this.dictionary.clear();
int i = 0;
for (String ds : datasources) {
String[] tokens = ds.split(":");
if (tokens.length == 6) {
String key = normalize(tokens[1]);
tokens[1] = String.valueOf(i);
try {
indexedDS.add(checkDataSource(join(":", tokens)));
dictionary.put(key, i);
} catch (IllegalArgumentException ex) {
LOGGER.warn("Ignoring malformed datasource {}.", ds);
}
} else {
LOGGER.warn("Ignoring malformed datasource {}.", ds);
}
i++;
}
return this;
}
Builder withArchives(String[] archives) {
if (archives == null) {
archives = new String[0];
}
this.archives.clear();
for (String archive : archives) {
try {
this.archives.add(checkArchive(archive));
} catch (IllegalArgumentException ex) {
LOGGER.warn("Ignoring malformed archive {}.", archive);
}
}
return this;
}
Builder withStep(int step) {
if (step <= 0) {
LOGGER.warn("Illegal step value, will use default({}).", DEFAULT_STEP);
step = DEFAULT_STEP;
}
this.step = step;
return this;
}
Builder convertRatesTo(TimeUnit ratesUnit) {
this.ratesUnit = ratesUnit;
return this;
}
Builder convertDurationsTo(TimeUnit durationUnit) {
this.durationUnit = durationUnit;
return this;
}
RRD4JReporter build() throws IOException {
if (indexedDS.isEmpty() || archives.isEmpty()) {
return null;
}
return new RRD4JReporter(metricRegistry, "RRD4JReporter", MetricFilter.ALL, ratesUnit, durationUnit,
dictionary, createDef());
}
private String checkDataSource(String ds) throws IllegalArgumentException {
new RrdDef("path").addDatasource(ds);
return ds;
}
private String checkArchive(String arch) throws IllegalArgumentException {
new RrdDef("path").addArchive(arch);
return arch;
}
private RrdDef createDef() {
RrdDef def = new RrdDef(path.getPath(), step);
for (String ds : indexedDS) {
def.addDatasource(ds);
}
for (String rra : archives) {
def.addArchive(rra);
}
return def;
}
}
RRD4JReporter(MetricRegistry registry,
String name,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
Map<String, Integer> dictionary,
RrdDef rrdDef) throws IOException {
super(registry, name, filter, rateUnit, durationUnit);
this.dictionary.putAll(dictionary);
this.rrdDB = createDB(rrdDef);
storeDictionary(rrdDef.getPath() + PROPERTIES_SUFFIX);
}
@Override
public void close() {
try {
rrdDB.close();
} catch (IOException e) {
LOGGER.warn("Closing RRD failed", e);
}
super.close();
}
@Override
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
long sampleTime = System.currentTimeMillis() / 1000;
if (sampleTime <= lastSampleTime) {
// sample at most once a second
return;
}
long time = System.nanoTime();
int total = gauges.size() + counters.size() + histograms.size() + meters.size() + timers.size();
int reported = 0;
try {
Sample sample = rrdDB.createSample(sampleTime);
for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
reported += update(sample, entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Counter> entry : counters.entrySet()) {
reported += update(sample, entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
reported += update(sample, entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Meter> entry : meters.entrySet()) {
reported += update(sample, entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Timer> entry : timers.entrySet()) {
reported += update(sample, entry.getKey(), entry.getValue());
}
sample.update();
} catch (IOException e) {
LOGGER.warn("Unable to write sample to RRD", e);
} finally {
lastSampleTime = sampleTime;
time = System.nanoTime() - time;
LOGGER.debug("{} out of {} metrics reported in {} \u03bcs",
reported, total, TimeUnit.NANOSECONDS.toMicros(time));
}
}
private int indexForName(String name) {
Integer idx = dictionary.get(normalize(name));
return idx != null ? idx : -1;
}
private static String normalize(String name) {
return name.replaceAll(":", "_");
}
private static void log(String key, String type, Number value) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sample: {} ({}) = {}", key, type, value);
}
}
private int update(Sample sample, String key, Gauge g) {
int nameIdx = indexForName(key);
if (nameIdx < 0) {
return 0;
}
Object value = g.getValue();
if (value instanceof Number) {
double val = ((Number) value).doubleValue();
sample.setValue(nameIdx, val);
log(key, "gauge", val);
return 1;
}
return 0;
}
private int update(Sample sample, String key, Counter c) {
int nameIdx = indexForName(key);
if (nameIdx < 0) {
return 0;
}
long val = c.getCount();
sample.setValue(nameIdx, val);
log(key, "counter", val);
return 1;
}
private int update(Sample sample, String key, Histogram h) {
int nameIdx = indexForName(key);
if (nameIdx < 0) {
return 0;
}
long val = h.getCount();
sample.setValue(nameIdx, val);
log(key, "histogram", val);
return 1;
}
private int update(Sample sample, String key, Timer t) {
int nameIdx = indexForName(key);
if (nameIdx < 0) {
return 0;
}
long val = t.getCount();
sample.setValue(nameIdx, val);
log(key, "timer", val);
return 1;
}
private int update(Sample sample, String key, Meter m) {
int nameIdx = indexForName(key);
if (nameIdx < 0) {
return 0;
}
long val = m.getCount();
sample.setValue(nameIdx, val);
log(key, "meter", val);
return 1;
}
private void storeDictionary(String path) throws IOException {
File dictFile = new File(path);
if (dictFile.exists() && ! dictFile.delete()) {
throw new IOException("Unable to delete dictionary file: " + dictFile.getPath());
}
Properties dict = new Properties();
for (Map.Entry<String, Integer> entry : dictionary.entrySet()) {
dict.put(String.valueOf(entry.getValue()), entry.getKey());
}
try (FileOutputStream out = new FileOutputStream(dictFile)) {
dict.store(out, "RRD4JReporter dictionary");
}
}
private static RrdDb createDB(RrdDef definition) throws IOException {
File dbFile = new File(definition.getPath());
if (!dbFile.getParentFile().exists()) {
if (!dbFile.getParentFile().mkdirs()) {
throw new IOException("Unable to create directory for RRD file: " + dbFile.getParent());
}
}
RrdDb db = null;
if (dbFile.exists()) {
db = new RrdDb(definition.getPath());
if (!db.getRrdDef().equals(definition)) {
// definition changed -> re-create DB
db.close();
File renamed = renameDB(dbFile);
LOGGER.info("Configuration changed, renamed existing RRD file to: {}",
renamed.getPath());
db = null;
}
}
if (db == null) {
db = new RrdDb(definition);
}
return db;
}
private static File renameDB(File dbFile) throws IOException {
// find a suitable suffix
int idx = 0;
while (new File(dbFile.getPath() + suffix(idx)).exists()) {
idx++;
}
// rename rrd file
rename(dbFile.toPath(), dbFile.getName() + suffix(idx));
// rename properties file
rename(dbFile.toPath().resolveSibling(dbFile.getName() + PROPERTIES_SUFFIX),
dbFile.getName() + suffix(idx) + PROPERTIES_SUFFIX);
return new File(dbFile.getParentFile(), dbFile.getName() + suffix(idx));
}
private static String suffix(int idx) {
return "." + idx;
}
private static void rename(Path path, String newName) throws IOException {
if (!Files.exists(path)) {
// nothing to rename
return;
}
Path target = path.resolveSibling(newName);
Files.move(path, target, REPLACE_EXISTING);
}
long getStep() {
try {
return rrdDB.getHeader().getStep();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
return -1;
}
String getPath() {
try {
return rrdDB.getCanonicalPath();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
return "";
}
Set<String> getDatasources() {
return dictionary.keySet();
}
Set<String> getArchives() {
Set<String> archives = new HashSet<>();
for (int i = 0; i < rrdDB.getArcCount(); i++) {
Archive ar = rrdDB.getArchive(i);
archives.add(ar.toString());
}
return archives;
}
@Override
public String toString() {
return "RRD4JReporter [path=" + getPath() + ", datasources=" + getDatasources() + ", archives=" + getArchives()
+ ", step=" + getStep() + ", dictionary=" + dictionary + "]";
}
}