blob: 6efd193fc9c6794ab17764c6db78d30165d27696 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
* A metrics sink that writes to a Graphite server
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class GraphiteSink implements MetricsSink, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(GraphiteSink.class);
private static final String SERVER_HOST_KEY = "server_host";
private static final String SERVER_PORT_KEY = "server_port";
private static final String METRICS_PREFIX = "metrics_prefix";
private String metricsPrefix = null;
private Graphite graphite = null;
@Override
public void init(SubsetConfiguration conf) {
// Get Graphite host configurations.
final String serverHost = conf.getString(SERVER_HOST_KEY);
final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
// Get Graphite metrics graph prefix.
metricsPrefix = conf.getString(METRICS_PREFIX);
if (metricsPrefix == null)
metricsPrefix = "";
graphite = new Graphite(serverHost, serverPort);
graphite.connect();
}
@Override
public void putMetrics(MetricsRecord record) {
StringBuilder lines = new StringBuilder();
StringBuilder metricsPathPrefix = new StringBuilder();
// Configure the hierarchical place to display the graph.
metricsPathPrefix.append(metricsPrefix).append(".")
.append(record.context()).append(".").append(record.name());
for (MetricsTag tag : record.tags()) {
if (tag.value() != null) {
metricsPathPrefix.append(".");
metricsPathPrefix.append(tag.name());
metricsPathPrefix.append("=");
metricsPathPrefix.append(tag.value());
}
}
// The record timestamp is in milliseconds while Graphite expects an epoc time in seconds.
long timestamp = record.timestamp() / 1000L;
// Collect datapoints.
for (AbstractMetric metric : record.metrics()) {
lines.append(
metricsPathPrefix.toString() + "."
+ metric.name().replace(' ', '.')).append(" ")
.append(metric.value()).append(" ").append(timestamp)
.append("\n");
}
try {
graphite.write(lines.toString());
} catch (Exception e) {
LOG.warn("Error sending metrics to Graphite", e);
try {
graphite.close();
} catch (Exception e1) {
throw new MetricsException("Error closing connection to Graphite", e1);
}
}
}
@Override
public void flush() {
try {
graphite.flush();
} catch (Exception e) {
LOG.warn("Error flushing metrics to Graphite", e);
try {
graphite.close();
} catch (Exception e1) {
throw new MetricsException("Error closing connection to Graphite", e1);
}
}
}
@Override
public void close() throws IOException {
graphite.close();
}
public static class Graphite {
private final static int MAX_CONNECTION_FAILURES = 5;
private String serverHost;
private int serverPort;
private Writer writer = null;
private Socket socket = null;
private int connectionFailures = 0;
public Graphite(String serverHost, int serverPort) {
this.serverHost = serverHost;
this.serverPort = serverPort;
}
public void connect() {
if (isConnected()) {
throw new MetricsException("Already connected to Graphite");
}
if (tooManyConnectionFailures()) {
// return silently (there was ERROR in logs when we reached limit for the first time)
return;
}
try {
// Open a connection to Graphite server.
socket = new Socket(serverHost, serverPort);
writer = new OutputStreamWriter(socket.getOutputStream(),
StandardCharsets.UTF_8);
} catch (Exception e) {
connectionFailures++;
if (tooManyConnectionFailures()) {
// first time when connection limit reached, report to logs
LOG.error("Too many connection failures, would not try to connect again.");
}
throw new MetricsException("Error creating connection, "
+ serverHost + ":" + serverPort, e);
}
}
public void write(String msg) throws IOException {
if (!isConnected()) {
connect();
}
if (isConnected()) {
writer.write(msg);
}
}
public void flush() throws IOException {
if (isConnected()) {
writer.flush();
}
}
public boolean isConnected() {
return socket != null && socket.isConnected() && !socket.isClosed();
}
public void close() throws IOException {
try {
if (writer != null) {
writer.close();
}
} catch (IOException ex) {
if (socket != null) {
socket.close();
}
} finally {
socket = null;
writer = null;
}
}
private boolean tooManyConnectionFailures() {
return connectionFailures > MAX_CONNECTION_FAILURES;
}
}
}