blob: cd40ac80d2611611b61ffaf8cbb131bbf5a44e66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.impl;
import com.google.common.collect.Lists;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricType;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.MetricsVisitor;
import org.apache.hadoop.metrics2.sink.KafkaSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringJoiner;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* This tests that the KafkaSink properly formats the Kafka message.
*/
public class TestKafkaMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(TestKafkaMetrics.class);
private KafkaSink kafkaSink;
enum KafkaMetricsInfo implements MetricsInfo {
KafkaMetrics("Kafka related metrics etc."), KafkaCounter(
"Kafka counter."), KafkaTag("Kafka tag.");
// metrics
private final String desc;
KafkaMetricsInfo(String desc) {
this.desc = desc;
}
@Override
public String description() {
return desc;
}
@Override
public String toString() {
return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}")
.add("name=" + name())
.add("description=" + desc)
.toString();
}
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPutMetrics() throws Exception {
// Create a record by mocking MetricsRecord class.
MetricsRecord record = mock(MetricsRecord.class);
when(record.tags()).thenReturn(Lists
.newArrayList(new MetricsTag(KafkaMetricsInfo.KafkaTag, "test_tag")));
when(record.timestamp()).thenReturn(System.currentTimeMillis());
// Create a metric using AbstractMetric class.
AbstractMetric metric = new AbstractMetric(KafkaMetricsInfo.KafkaCounter) {
@Override
public Number value() {
return new Integer(123);
}
@Override
public MetricType type() {
return null;
}
@Override
public void visit(MetricsVisitor visitor) {
}
};
// Create a list of metrics.
Iterable<AbstractMetric> metrics = Lists.newArrayList(metric);
when(record.name()).thenReturn("Kafka record name");
when(record.metrics()).thenReturn(metrics);
SubsetConfiguration conf = mock(SubsetConfiguration.class);
when(conf.getString(KafkaSink.BROKER_LIST)).thenReturn("localhost:9092");
String topic = "myTestKafkaTopic";
when(conf.getString(KafkaSink.TOPIC)).thenReturn(topic);
// Create the KafkaSink object and initialize it.
kafkaSink = new KafkaSink();
kafkaSink.init(conf);
// Create a mock KafkaProducer as a producer for KafkaSink.
Producer<Integer, byte[]> mockProducer = mock(KafkaProducer.class);
kafkaSink.setProducer(mockProducer);
// Create the json object from the record.
StringBuilder jsonLines = recordToJson(record);
if (LOG.isDebugEnabled()) {
LOG.debug("kafka message: " + jsonLines.toString());
}
// Send the record and store the result in a mock Future.
Future<RecordMetadata> f = mock(Future.class);
when(mockProducer.send((ProducerRecord) anyObject())).thenReturn(f);
kafkaSink.putMetrics(record);
// Get the argument and verity it.
ArgumentCaptor<ProducerRecord> argument =
ArgumentCaptor.forClass(ProducerRecord.class);
verify(mockProducer).send(argument.capture());
// Compare the received data with the original one.
ProducerRecord<Integer, byte[]> data = (argument.getValue());
String jsonResult = new String(data.value());
if (LOG.isDebugEnabled()) {
LOG.debug("kafka result: " + jsonResult);
}
assertEquals(jsonLines.toString(), jsonResult);
}
StringBuilder recordToJson(MetricsRecord record) {
// Create a json object from a metrics record.
StringBuilder jsonLines = new StringBuilder();
Long timestamp = record.timestamp();
Date currDate = new Date(timestamp);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String date = dateFormat.format(currDate);
SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");
String time = timeFormat.format(currDate);
String hostname = new String("null");
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOG.warn("Error getting Hostname, going to continue");
}
jsonLines.append("{\"hostname\": \"" + hostname);
jsonLines.append("\", \"timestamp\": " + timestamp);
jsonLines.append(", \"date\": \"" + date);
jsonLines.append("\",\"time\": \"" + time);
jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
for (MetricsTag tag : record.tags()) {
jsonLines.append(
", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + tag.value().toString() + "\"");
}
for (AbstractMetric m : record.metrics()) {
jsonLines.append(
", \"" + m.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + m.value().toString() + "\"");
}
jsonLines.append("}");
return jsonLines;
}
}