| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sling.commons.metrics.rrd4j.impl; |
| |
| import com.codahale.metrics.Counter; |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Histogram; |
| import com.codahale.metrics.Meter; |
| import com.codahale.metrics.MetricFilter; |
| import com.codahale.metrics.MetricRegistry; |
| import com.codahale.metrics.ScheduledReporter; |
| import com.codahale.metrics.Timer; |
| |
| import org.rrd4j.core.Archive; |
| import org.rrd4j.core.RrdDb; |
| import org.rrd4j.core.RrdDef; |
| import org.rrd4j.core.Sample; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.concurrent.TimeUnit; |
| |
| import static java.lang.String.join; |
| import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; |
| |
| class RRD4JReporter extends ScheduledReporter { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(RRD4JReporter.class); |
| |
| private static final String PROPERTIES_SUFFIX = ".properties"; |
| static final int DEFAULT_STEP = 5; |
| static final String DEFAULT_PATH = "metrics/metrics.rrd"; |
| |
| private final Map<String, Integer> dictionary = new HashMap<>(); |
| private final RrdDb rrdDB; |
| private long lastSampleTime; |
| |
| static Builder forRegistry(MetricRegistry metricRegistry) { |
| return new Builder(metricRegistry); |
| } |
| |
| static class Builder { |
| private MetricRegistry metricRegistry; |
| private TimeUnit ratesUnit = TimeUnit.SECONDS; |
| private TimeUnit durationUnit = TimeUnit.MICROSECONDS; |
| private File path = new File("."); |
| private final List<String> indexedDS = new ArrayList<>(); |
| private final Map<String, Integer> dictionary = new HashMap<>(); |
| private final List<String> archives = new ArrayList<>(); |
| private int step = DEFAULT_STEP; |
| |
| Builder(MetricRegistry metricRegistry ) { |
| this.metricRegistry = metricRegistry; |
| } |
| |
| Builder withPath(File path) { |
| if (path == null) { |
| LOGGER.warn("Illegal path value, will use default({}).", DEFAULT_PATH); |
| path = new File(DEFAULT_PATH); |
| } |
| this.path = path; |
| return this; |
| } |
| |
| Builder withDatasources(String[] datasources) { |
| if (datasources == null) { |
| datasources = new String[0]; |
| } |
| |
| this.indexedDS.clear(); |
| this.dictionary.clear(); |
| |
| int i = 0; |
| for (String ds : datasources) { |
| String[] tokens = ds.split(":"); |
| if (tokens.length == 6) { |
| String key = normalize(tokens[1]); |
| tokens[1] = String.valueOf(i); |
| try { |
| indexedDS.add(checkDataSource(join(":", tokens))); |
| dictionary.put(key, i); |
| } catch (IllegalArgumentException ex) { |
| LOGGER.warn("Ignoring malformed datasource {}.", ds); |
| } |
| } else { |
| LOGGER.warn("Ignoring malformed datasource {}.", ds); |
| } |
| i++; |
| } |
| return this; |
| } |
| |
| Builder withArchives(String[] archives) { |
| if (archives == null) { |
| archives = new String[0]; |
| } |
| this.archives.clear(); |
| |
| for (String archive : archives) { |
| try { |
| this.archives.add(checkArchive(archive)); |
| } catch (IllegalArgumentException ex) { |
| LOGGER.warn("Ignoring malformed archive {}.", archive); |
| } |
| } |
| return this; |
| } |
| |
| Builder withStep(int step) { |
| if (step <= 0) { |
| LOGGER.warn("Illegal step value, will use default({}).", DEFAULT_STEP); |
| step = DEFAULT_STEP; |
| } |
| this.step = step; |
| return this; |
| } |
| |
| Builder convertRatesTo(TimeUnit ratesUnit) { |
| this.ratesUnit = ratesUnit; |
| return this; |
| } |
| |
| Builder convertDurationsTo(TimeUnit durationUnit) { |
| this.durationUnit = durationUnit; |
| return this; |
| } |
| |
| RRD4JReporter build() throws IOException { |
| if (indexedDS.isEmpty() || archives.isEmpty()) { |
| return null; |
| } |
| return new RRD4JReporter(metricRegistry, "RRD4JReporter", MetricFilter.ALL, ratesUnit, durationUnit, |
| dictionary, createDef()); |
| } |
| |
| private String checkDataSource(String ds) throws IllegalArgumentException { |
| new RrdDef("path").addDatasource(ds); |
| return ds; |
| } |
| |
| private String checkArchive(String arch) throws IllegalArgumentException { |
| new RrdDef("path").addArchive(arch); |
| return arch; |
| } |
| |
| private RrdDef createDef() { |
| RrdDef def = new RrdDef(path.getPath(), step); |
| for (String ds : indexedDS) { |
| def.addDatasource(ds); |
| } |
| for (String rra : archives) { |
| def.addArchive(rra); |
| } |
| return def; |
| } |
| } |
| |
| RRD4JReporter(MetricRegistry registry, |
| String name, |
| MetricFilter filter, |
| TimeUnit rateUnit, |
| TimeUnit durationUnit, |
| Map<String, Integer> dictionary, |
| RrdDef rrdDef) throws IOException { |
| super(registry, name, filter, rateUnit, durationUnit); |
| this.dictionary.putAll(dictionary); |
| this.rrdDB = createDB(rrdDef); |
| storeDictionary(rrdDef.getPath() + PROPERTIES_SUFFIX); |
| } |
| |
| @Override |
| public void close() { |
| try { |
| rrdDB.close(); |
| } catch (IOException e) { |
| LOGGER.warn("Closing RRD failed", e); |
| } |
| super.close(); |
| } |
| |
| @Override |
| public void report(SortedMap<String, Gauge> gauges, |
| SortedMap<String, Counter> counters, |
| SortedMap<String, Histogram> histograms, |
| SortedMap<String, Meter> meters, |
| SortedMap<String, Timer> timers) { |
| long sampleTime = System.currentTimeMillis() / 1000; |
| if (sampleTime <= lastSampleTime) { |
| // sample at most once a second |
| return; |
| } |
| long time = System.nanoTime(); |
| int total = gauges.size() + counters.size() + histograms.size() + meters.size() + timers.size(); |
| int reported = 0; |
| try { |
| Sample sample = rrdDB.createSample(sampleTime); |
| for (Map.Entry<String, Gauge> entry : gauges.entrySet()) { |
| reported += update(sample, entry.getKey(), entry.getValue()); |
| } |
| |
| for (Map.Entry<String, Counter> entry : counters.entrySet()) { |
| reported += update(sample, entry.getKey(), entry.getValue()); |
| } |
| |
| for (Map.Entry<String, Histogram> entry : histograms.entrySet()) { |
| reported += update(sample, entry.getKey(), entry.getValue()); |
| } |
| |
| for (Map.Entry<String, Meter> entry : meters.entrySet()) { |
| reported += update(sample, entry.getKey(), entry.getValue()); |
| } |
| |
| for (Map.Entry<String, Timer> entry : timers.entrySet()) { |
| reported += update(sample, entry.getKey(), entry.getValue()); |
| } |
| sample.update(); |
| } catch (IOException e) { |
| LOGGER.warn("Unable to write sample to RRD", e); |
| } finally { |
| lastSampleTime = sampleTime; |
| time = System.nanoTime() - time; |
| LOGGER.debug("{} out of {} metrics reported in {} \u03bcs", |
| reported, total, TimeUnit.NANOSECONDS.toMicros(time)); |
| } |
| } |
| |
| private int indexForName(String name) { |
| Integer idx = dictionary.get(normalize(name)); |
| return idx != null ? idx : -1; |
| } |
| |
| private static String normalize(String name) { |
| return name.replaceAll(":", "_"); |
| } |
| |
| private static void log(String key, String type, Number value) { |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Sample: {} ({}) = {}", key, type, value); |
| } |
| } |
| |
| private int update(Sample sample, String key, Gauge g) { |
| int nameIdx = indexForName(key); |
| if (nameIdx < 0) { |
| return 0; |
| } |
| Object value = g.getValue(); |
| if (value instanceof Number) { |
| double val = ((Number) value).doubleValue(); |
| sample.setValue(nameIdx, val); |
| log(key, "gauge", val); |
| return 1; |
| } |
| return 0; |
| } |
| |
| private int update(Sample sample, String key, Counter c) { |
| int nameIdx = indexForName(key); |
| if (nameIdx < 0) { |
| return 0; |
| } |
| long val = c.getCount(); |
| sample.setValue(nameIdx, val); |
| log(key, "counter", val); |
| return 1; |
| } |
| |
| private int update(Sample sample, String key, Histogram h) { |
| int nameIdx = indexForName(key); |
| if (nameIdx < 0) { |
| return 0; |
| } |
| long val = h.getCount(); |
| sample.setValue(nameIdx, val); |
| log(key, "histogram", val); |
| return 1; |
| } |
| |
| private int update(Sample sample, String key, Timer t) { |
| int nameIdx = indexForName(key); |
| if (nameIdx < 0) { |
| return 0; |
| } |
| long val = t.getCount(); |
| sample.setValue(nameIdx, val); |
| log(key, "timer", val); |
| return 1; |
| } |
| |
| private int update(Sample sample, String key, Meter m) { |
| int nameIdx = indexForName(key); |
| if (nameIdx < 0) { |
| return 0; |
| } |
| long val = m.getCount(); |
| sample.setValue(nameIdx, val); |
| log(key, "meter", val); |
| return 1; |
| } |
| |
| private void storeDictionary(String path) throws IOException { |
| File dictFile = new File(path); |
| if (dictFile.exists() && ! dictFile.delete()) { |
| throw new IOException("Unable to delete dictionary file: " + dictFile.getPath()); |
| } |
| Properties dict = new Properties(); |
| for (Map.Entry<String, Integer> entry : dictionary.entrySet()) { |
| dict.put(String.valueOf(entry.getValue()), entry.getKey()); |
| } |
| try (FileOutputStream out = new FileOutputStream(dictFile)) { |
| dict.store(out, "RRD4JReporter dictionary"); |
| } |
| } |
| |
| private static RrdDb createDB(RrdDef definition) throws IOException { |
| File dbFile = new File(definition.getPath()); |
| if (!dbFile.getParentFile().exists()) { |
| if (!dbFile.getParentFile().mkdirs()) { |
| throw new IOException("Unable to create directory for RRD file: " + dbFile.getParent()); |
| } |
| } |
| RrdDb db = null; |
| if (dbFile.exists()) { |
| db = new RrdDb(definition.getPath()); |
| if (!db.getRrdDef().equals(definition)) { |
| // definition changed -> re-create DB |
| db.close(); |
| File renamed = renameDB(dbFile); |
| LOGGER.info("Configuration changed, renamed existing RRD file to: {}", |
| renamed.getPath()); |
| db = null; |
| } |
| } |
| if (db == null) { |
| db = new RrdDb(definition); |
| } |
| return db; |
| } |
| |
| private static File renameDB(File dbFile) throws IOException { |
| // find a suitable suffix |
| int idx = 0; |
| while (new File(dbFile.getPath() + suffix(idx)).exists()) { |
| idx++; |
| } |
| // rename rrd file |
| rename(dbFile.toPath(), dbFile.getName() + suffix(idx)); |
| // rename properties file |
| rename(dbFile.toPath().resolveSibling(dbFile.getName() + PROPERTIES_SUFFIX), |
| dbFile.getName() + suffix(idx) + PROPERTIES_SUFFIX); |
| |
| return new File(dbFile.getParentFile(), dbFile.getName() + suffix(idx)); |
| } |
| |
| private static String suffix(int idx) { |
| return "." + idx; |
| } |
| |
| private static void rename(Path path, String newName) throws IOException { |
| if (!Files.exists(path)) { |
| // nothing to rename |
| return; |
| } |
| Path target = path.resolveSibling(newName); |
| Files.move(path, target, REPLACE_EXISTING); |
| } |
| |
| long getStep() { |
| try { |
| return rrdDB.getHeader().getStep(); |
| } catch (IOException e) { |
| LOGGER.error(e.getMessage(), e); |
| } |
| return -1; |
| } |
| |
| String getPath() { |
| try { |
| return rrdDB.getCanonicalPath(); |
| } catch (IOException e) { |
| LOGGER.error(e.getMessage(), e); |
| } |
| return ""; |
| } |
| |
| Set<String> getDatasources() { |
| return dictionary.keySet(); |
| } |
| |
| Set<String> getArchives() { |
| Set<String> archives = new HashSet<>(); |
| for (int i = 0; i < rrdDB.getArcCount(); i++) { |
| Archive ar = rrdDB.getArchive(i); |
| archives.add(ar.toString()); |
| } |
| return archives; |
| } |
| |
| @Override |
| public String toString() { |
| return "RRD4JReporter [path=" + getPath() + ", datasources=" + getDatasources() + ", archives=" + getArchives() |
| + ", step=" + getStep() + ", dictionary=" + dictionary + "]"; |
| } |
| } |