blob: b13ba7e69e44d7aab2c56c6a916841a66c282331 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.metrics;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* This class allows testing of the publishing to the hadoop metrics system by processing a file for
* metric records (written as a line.) The file should be configured using the hadoop metrics
* properties as a file based sink with the prefix that is provided on instantiation of the
* instance.
*
* This class will simulate tail-ing a file and is intended to be run in a separate thread. When the
* underlying file has data written, the vaule returned by getLastUpdate will change, and the last
* line can be retrieved with getLast().
*/
public class MetricsFileTailer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(MetricsFileTailer.class);
private static final int BUFFER_SIZE = 4;
private final String metricsPrefix;
private final Lock lock = new ReentrantLock();
private final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE);
private AtomicLong lastUpdate = new AtomicLong(0);
private long startTime = System.nanoTime();
private int lineCounter = 0;
private String[] lineBuffer = new String[BUFFER_SIZE];
private final String metricsFilename;
/**
* Create an instance that will tail a metrics file. The filename / path is determined by the
* hadoop-metrics-accumulo.properties sink configuration for the metrics prefix that is provided.
*
* @param metricsPrefix
* the prefix in the metrics configuration.
*/
public MetricsFileTailer(final String metricsPrefix) {
this.metricsPrefix = metricsPrefix;
Configuration sub = loadMetricsConfig();
// dump received configuration keys received.
if (log.isTraceEnabled()) {
Iterator<String> keys = sub.getKeys();
while (keys.hasNext()) {
log.trace("configuration key:{}", keys.next());
}
}
if (sub.containsKey("filename")) {
metricsFilename = sub.getString("filename");
} else {
metricsFilename = "";
}
}
/**
* Create an instance by specifying a file directly instead of using the metrics configuration -
* mainly for testing.
*
* @param metricsPrefix
* generally can be ignored.
* @param filename
* the path / file to be monitored.
*/
MetricsFileTailer(final String metricsPrefix, final String filename) {
this.metricsPrefix = metricsPrefix;
metricsFilename = filename;
}
/**
* Look for the accumulo metrics configuration file on the classpath and return the subset for the
* http sink.
*
* @return a configuration with http sink properties.
*/
@SuppressFBWarnings(value = "URLCONNECTION_SSRF_FD",
justification = "url specified by test code, not unchecked user input")
private Configuration loadMetricsConfig() {
final URL propUrl =
getClass().getClassLoader().getResource(MetricsTestSinkProperties.METRICS_PROP_FILENAME);
if (propUrl == null) {
throw new IllegalStateException(
"Could not find " + MetricsTestSinkProperties.METRICS_PROP_FILENAME + " on classpath");
}
// Read data from this file
var config = new PropertiesConfiguration();
try (var reader = new InputStreamReader(propUrl.openStream(), UTF_8)) {
config.read(reader);
} catch (ConfigurationException | IOException e) {
throw new IllegalStateException(
String.format("Could not find configuration file \'%s\' on classpath",
MetricsTestSinkProperties.METRICS_PROP_FILENAME));
}
final Configuration sub = config.subset(metricsPrefix);
if (log.isTraceEnabled()) {
log.trace("Config {}", config);
Iterator<String> iterator = sub.getKeys();
while (iterator.hasNext()) {
String key = iterator.next();
log.trace("'{}'='{}'", key, sub.getProperty(key));
}
}
return sub;
}
/**
* Creates a marker value that increases each time a new line is detected. Clients can use this to
* determine if a call to getLast() will return a new value. However, this value is <b>NOT</b> a
* timestamp and should not be interpreted as such. Furthermore, it does not indicate that the
* metrics being reported have changed, only that a new metrics poll took place and was written to
* the file. So, if clients need to observe new metrics from a new event, they need to parse the
* line themselves to look for changed metrics values.
*
* @return a marker value set when a new line is available.
*/
public long getLastUpdate() {
return lastUpdate.get();
}
/**
* Get the last line seen in the file.
*
* @return the last line from the file.
*/
public String getLast() {
lock.lock();
try {
int last = (lineCounter % BUFFER_SIZE) - 1;
if (last < 0) {
last = BUFFER_SIZE - 1;
}
return lineBuffer[last];
} finally {
lock.unlock();
}
}
/**
* The hadoop metrics file sink published records as a line with comma separated key=value pairs.
* This method parses the line and extracts the key, value pair from metrics that start with an
* optional prefix and returns them in a sort map. If the prefix is null or empty, all keys are
* accepted.
*
* @param prefix
* optional filter - include metrics that start with provided value..
* @return a map of the metrics that start with AccGc
*/
public Map<String,String> parseLine(final String prefix) {
String line = getLast();
if (line == null) {
return Collections.emptyMap();
}
Map<String,String> m = new TreeMap<>();
String[] csvTokens = line.split(",");
for (String token : csvTokens) {
token = token.trim();
if (filter(prefix, token)) {
String[] parts = token.split("=");
m.put(parts[0], parts[1]);
}
}
return m;
}
private boolean filter(final String prefix, final String candidate) {
if (candidate == null) {
return false;
}
if (prefix == null || prefix.isEmpty()) {
return true;
}
return candidate.startsWith(prefix);
}
/**
* A loop that polls for changes and when the file changes, put the last line in a buffer that can
* be retrieved by clients using getLast().
*/
private void run() {
long filePos = 0;
File f = new File(metricsFilename);
while (running.get()) {
try {
Thread.sleep(5_000);
} catch (InterruptedException ex) {
running.set(Boolean.FALSE);
Thread.currentThread().interrupt();
return;
}
long len = f.length();
try {
// file truncated? reset position
if (len < filePos) {
filePos = 0;
lock.lock();
try {
for (int i = 0; i < BUFFER_SIZE; i++) {
lineBuffer[i] = "";
}
lineCounter = 0;
} finally {
lock.unlock();
}
}
if (len > filePos) {
// File must have had something added to it!
RandomAccessFile raf = new RandomAccessFile(f, "r");
raf.seek(filePos);
String line;
lock.lock();
try {
while ((line = raf.readLine()) != null) {
lineBuffer[lineCounter++ % BUFFER_SIZE] = line;
}
lastUpdate.set(System.nanoTime() - startTime);
} finally {
lock.unlock();
}
filePos = raf.getFilePointer();
raf.close();
}
} catch (Exception ex) {
log.info("Error processing metrics file {}", metricsFilename, ex);
}
}
}
public void startDaemonThread() {
if (running.compareAndSet(false, true)) {
Thread t = new Thread(() -> this.run());
t.setDaemon(true);
t.start();
}
}
@Override
public void close() {
running.set(Boolean.FALSE);
}
// utilities to block, waiting for update - call from process thread
public static class LineUpdate {
private final long lastUpdate;
private final String line;
public LineUpdate(long lastUpdate, String line) {
this.lastUpdate = lastUpdate;
this.line = line;
}
public long getLastUpdate() {
return lastUpdate;
}
public String getLine() {
return line;
}
@Override
public String toString() {
return "LineUpdate{" + "lastUpdate=" + lastUpdate + ", line='" + line + '\'' + '}';
}
}
public LineUpdate waitForUpdate(final long prevUpdate, final int maxAttempts, final long delay) {
for (int count = 0; count < maxAttempts; count++) {
String line = getLast();
long currUpdate = getLastUpdate();
if (line != null && (currUpdate != prevUpdate)) {
return new LineUpdate(getLastUpdate(), line);
}
try {
Thread.sleep(delay);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}
// not found - throw exception.
throw new IllegalStateException(
String.format("File source update not received after %d tries in %d sec", maxAttempts,
TimeUnit.MILLISECONDS.toSeconds(delay * maxAttempts)));
}
}