blob: 2eb813a0ccb60100ae66637f64e0f2460c40dc58 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink.ganglia;
import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.util.Servers;
import org.apache.hadoop.net.DNS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This the base class for Ganglia sink classes using metrics2. Lot of the code
* has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext.
* As per the documentation, sink implementations doesn't have to worry about
* thread safety. Hence the code wasn't written for thread safety and should
* be modified in case the above assumption changes in the future.
*/
public abstract class AbstractGangliaSink implements MetricsSink {
public final Logger LOG = LoggerFactory.getLogger(this.getClass());
/*
* Output of "gmetric --help" showing allowable values
* -t, --type=STRING
* Either string|int8|uint8|int16|uint16|int32|uint32|float|double
* -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius
* (default='')
* -s, --slope=STRING Either zero|positive|negative|both
* (default='both')
* -x, --tmax=INT The maximum time in seconds between gmetric calls
* (default='60')
*/
public static final String DEFAULT_UNITS = "";
public static final int DEFAULT_TMAX = 60;
public static final int DEFAULT_DMAX = 0;
public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
public static final int DEFAULT_PORT = 8649;
public static final boolean DEFAULT_MULTICAST_ENABLED = false;
public static final int DEFAULT_MULTICAST_TTL = 1;
public static final String SERVERS_PROPERTY = "servers";
public static final String MULTICAST_ENABLED_PROPERTY = "multicast";
public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
public static final int BUFFER_SIZE = 1500; // as per libgmond.c
public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
public static final String EQUAL = "=";
private String hostName = "UNKNOWN.example.com";
private DatagramSocket datagramSocket;
private List<? extends SocketAddress> metricsServers;
private boolean multicastEnabled;
private int multicastTtl;
private byte[] buffer = new byte[BUFFER_SIZE];
private int offset;
private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
/**
* Used for visiting Metrics
*/
protected final GangliaMetricVisitor gangliaMetricVisitor =
new GangliaMetricVisitor();
private SubsetConfiguration conf;
private Map<String, GangliaConf> gangliaConfMap;
private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf();
/**
* ganglia slope values which equal the ordinal
*/
public enum GangliaSlope {
zero, // 0
positive, // 1
negative, // 2
both // 3
};
/**
* define enum for various type of conf
*/
public enum GangliaConfType {
slope, units, dmax, tmax
};
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration
* .SubsetConfiguration)
*/
public void init(SubsetConfiguration conf) {
LOG.debug("Initializing the GangliaSink for Ganglia metrics.");
this.conf = conf;
// Take the hostname from the DNS class.
if (conf.getString("slave.host.name") != null) {
hostName = conf.getString("slave.host.name");
} else {
try {
hostName = DNS.getDefaultHost(
conf.getString("dfs.datanode.dns.interface", "default"),
conf.getString("dfs.datanode.dns.nameserver", "default"));
} catch (UnknownHostException uhe) {
LOG.error(uhe.toString());
hostName = "UNKNOWN.example.com";
}
}
// load the gannglia servers from properties
metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
DEFAULT_PORT);
multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
DEFAULT_MULTICAST_ENABLED);
multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);
// extract the Ganglia conf per metrics
gangliaConfMap = new HashMap<String, GangliaConf>();
loadGangliaConf(GangliaConfType.units);
loadGangliaConf(GangliaConfType.tmax);
loadGangliaConf(GangliaConfType.dmax);
loadGangliaConf(GangliaConfType.slope);
try {
if (multicastEnabled) {
LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
datagramSocket = new MulticastSocket();
((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
} else {
datagramSocket = new DatagramSocket();
}
} catch (IOException e) {
LOG.error(e.toString());
}
// see if sparseMetrics is supported. Default is false
supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
SUPPORT_SPARSE_METRICS_DEFAULT);
}
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.metrics2.MetricsSink#flush()
*/
public void flush() {
// nothing to do as we are not buffering data
}
// Load the configurations for a conf type
private void loadGangliaConf(GangliaConfType gtype) {
String propertyarr[] = conf.getStringArray(gtype.name());
if (propertyarr != null && propertyarr.length > 0) {
for (String metricNValue : propertyarr) {
String metricNValueArr[] = metricNValue.split(EQUAL);
if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) {
LOG.error("Invalid propertylist for " + gtype.name());
}
String metricName = metricNValueArr[0].trim();
String metricValue = metricNValueArr[1].trim();
GangliaConf gconf = gangliaConfMap.get(metricName);
if (gconf == null) {
gconf = new GangliaConf();
gangliaConfMap.put(metricName, gconf);
}
switch (gtype) {
case units:
gconf.setUnits(metricValue);
break;
case dmax:
gconf.setDmax(Integer.parseInt(metricValue));
break;
case tmax:
gconf.setTmax(Integer.parseInt(metricValue));
break;
case slope:
gconf.setSlope(GangliaSlope.valueOf(metricValue));
break;
}
}
}
}
/**
* Lookup GangliaConf from cache. If not found, return default values
*
* @param metricName
* @return looked up GangliaConf
*/
protected GangliaConf getGangliaConfForMetric(String metricName) {
GangliaConf gconf = gangliaConfMap.get(metricName);
return gconf != null ? gconf : DEFAULT_GANGLIA_CONF;
}
/**
* @return the hostName
*/
protected String getHostName() {
return hostName;
}
/**
* Puts a string into the buffer by first writing the size of the string as an
* int, followed by the bytes of the string, padded if necessary to a multiple
* of 4.
* @param s the string to be written to buffer at offset location
*/
protected void xdr_string(String s) {
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
int len = bytes.length;
xdr_int(len);
System.arraycopy(bytes, 0, buffer, offset, len);
offset += len;
pad();
}
// Pads the buffer with zero bytes up to the nearest multiple of 4.
private void pad() {
int newOffset = ((offset + 3) / 4) * 4;
while (offset < newOffset) {
buffer[offset++] = 0;
}
}
/**
* Puts an integer into the buffer as 4 bytes, big-endian.
*/
protected void xdr_int(int i) {
buffer[offset++] = (byte) ((i >> 24) & 0xff);
buffer[offset++] = (byte) ((i >> 16) & 0xff);
buffer[offset++] = (byte) ((i >> 8) & 0xff);
buffer[offset++] = (byte) (i & 0xff);
}
/**
* Sends Ganglia Metrics to the configured hosts
* @throws IOException
*/
protected void emitToGangliaHosts() throws IOException {
try {
for (SocketAddress socketAddress : metricsServers) {
if (socketAddress == null || !(socketAddress instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported Address type");
InetSocketAddress inetAddress = (InetSocketAddress)socketAddress;
if(inetAddress.isUnresolved()) {
throw new UnknownHostException("Unresolved host: " + inetAddress);
}
DatagramPacket packet =
new DatagramPacket(buffer, offset, socketAddress);
datagramSocket.send(packet);
}
} finally {
// reset the buffer for the next metric to be built
offset = 0;
}
}
/**
* Reset the buffer for the next metric to be built
*/
void resetBuffer() {
offset = 0;
}
/**
* @return whether sparse metrics are supported
*/
protected boolean isSupportSparseMetrics() {
return supportSparseMetrics;
}
/**
* Used only by unit test
* @param datagramSocket the datagramSocket to set.
*/
void setDatagramSocket(DatagramSocket datagramSocket) {
this.datagramSocket = datagramSocket;
}
/**
* Used only by unit tests
* @return the datagramSocket for this sink
*/
DatagramSocket getDatagramSocket() {
return datagramSocket;
}
}