blob: c83552b94d311ade286782d237b3253525d59e47 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink;
import com.google.common.base.Strings;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* A metrics sink that writes to a Kafka broker. This requires you to configure
* a broker_list and a topic in the metrics2 configuration file. The broker_list
* must contain a comma-separated list of kafka broker host and ports. The topic
* will contain only one topic.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KafkaSink implements MetricsSink, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
public static final String BROKER_LIST = "broker_list";
public static final String TOPIC = "topic";
private String hostname = null;
private String brokerList = null;
private String topic = null;
private Producer<Integer, byte[]> producer = null;
private final DateTimeFormatter dateFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
private final DateTimeFormatter timeFormat =
DateTimeFormatter.ofPattern("HH:mm:ss");
private final ZoneId zoneId = ZoneId.systemDefault();
public void setProducer(Producer<Integer, byte[]> p) {
this.producer = p;
}
@Override
public void init(SubsetConfiguration conf) {
// Get Kafka broker configuration.
Properties props = new Properties();
brokerList = conf.getString(BROKER_LIST);
if (LOG.isDebugEnabled()) {
LOG.debug("Broker list " + brokerList);
}
props.put("bootstrap.servers", brokerList);
if (LOG.isDebugEnabled()) {
LOG.debug("Kafka brokers: " + brokerList);
}
// Get Kafka topic configuration.
topic = conf.getString(TOPIC);
if (LOG.isDebugEnabled()) {
LOG.debug("Kafka topic " + topic);
}
if (Strings.isNullOrEmpty(topic)) {
throw new MetricsException("Kafka topic can not be null");
}
// Set the rest of Kafka configuration.
props.put("key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("request.required.acks", "0");
// Set the hostname once and use it in every message.
hostname = "null";
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOG.warn("Error getting Hostname, going to continue");
}
try {
// Create the producer object.
producer = new KafkaProducer<Integer, byte[]>(props);
} catch (Exception e) {
throw new MetricsException("Error creating Producer, " + brokerList, e);
}
}
@Override
public void putMetrics(MetricsRecord record) {
if (producer == null) {
throw new MetricsException("Producer in KafkaSink is null!");
}
// Create the json object.
StringBuilder jsonLines = new StringBuilder();
long timestamp = record.timestamp();
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime ldt = LocalDateTime.ofInstant(instant, zoneId);
String date = ldt.format(dateFormat);
String time = ldt.format(timeFormat);
// Collect datapoints and populate the json object.
jsonLines.append("{\"hostname\": \"" + hostname);
jsonLines.append("\", \"timestamp\": " + timestamp);
jsonLines.append(", \"date\": \"" + date);
jsonLines.append("\",\"time\": \"" + time);
jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
for (MetricsTag tag : record.tags()) {
jsonLines.append(
", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + tag.value().toString() + "\"");
}
for (AbstractMetric metric : record.metrics()) {
jsonLines.append(", \""
+ metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + metric.value().toString() + "\"");
}
jsonLines.append("}");
LOG.debug("kafka message: " + jsonLines.toString());
// Create the record to be sent from the json.
ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>(
topic, jsonLines.toString().getBytes(Charset.forName("UTF-8")));
// Send the data to the Kafka broker. Here is an example of this data:
// {"hostname": "...", "timestamp": 1436913651516,
// "date": "2015-6-14","time": "22:40:51","context": "yarn","name":
// "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0",
// "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1",
// "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0",
// "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132",
// "AllocatedContainers": "132", "AggregateContainersAllocated": "132",
// "AggregateContainersReleased": "0", "AvailableMB": "0",
// "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269",
// "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0",
// "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"}
Future<RecordMetadata> future = producer.send(data);
jsonLines.setLength(0);
try {
future.get();
} catch (InterruptedException e) {
throw new MetricsException("Error sending data", e);
} catch (ExecutionException e) {
throw new MetricsException("Error sending data", e);
}
}
@Override
public void flush() {
LOG.debug("Kafka seems not to have any flush() mechanism!");
}
@Override
public void close() throws IOException {
// Close the producer and set it to null.
try {
producer.close();
} catch (RuntimeException e) {
throw new MetricsException("Error closing producer", e);
} finally {
producer = null;
}
}
}