blob: b33b75c71061a2b95d2c2d5eb58b67493356583e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable
* webhook endpoint in JSON format.
*
* This is an internal class used for system tests and does not provide any compatibility guarantees.
*/
public class PushHttpMetricsReporter implements MetricsReporter {
private static final Logger log = LoggerFactory.getLogger(PushHttpMetricsReporter.class);
private static final String METRICS_PREFIX = "metrics.";
static final String METRICS_URL_CONFIG = METRICS_PREFIX + "url";
static final String METRICS_PERIOD_CONFIG = METRICS_PREFIX + "period";
static final String METRICS_HOST_CONFIG = METRICS_PREFIX + "host";
static final String CLIENT_ID_CONFIG = ProducerConfig.CLIENT_ID_CONFIG;
private static final Map<String, String> HEADERS = new LinkedHashMap<>();
static {
HEADERS.put("Content-Type", "application/json");
}
private final Object lock = new Object();
private final Time time;
private final ScheduledExecutorService executor;
// The set of metrics are updated in init/metricChange/metricRemoval
private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
private final ObjectMapper json = new ObjectMapper();
// Non-final because these are set via configure()
private URL url;
private String host;
private String clientId;
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METRICS_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
"The URL to report metrics to")
.define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
"The frequency at which metrics should be reported, in second")
.define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
"The hostname to report with each metric; if empty, defaults to the FQDN that can be automatically" +
"determined")
.define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
"Client ID to identify the application, generally inherited from the " +
"producer/consumer/streams/connect instance");
public PushHttpMetricsReporter() {
time = new SystemTime();
executor = Executors.newSingleThreadScheduledExecutor();
}
PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {
time = mockTime;
executor = mockExecutor;
}
@Override
public void configure(Map<String, ?> configs) {
PushHttpMetricsReporterConfig config = new PushHttpMetricsReporterConfig(CONFIG_DEF, configs);
try {
url = new URL(config.getString(METRICS_URL_CONFIG));
} catch (MalformedURLException e) {
throw new ConfigException("Malformed metrics.url", e);
}
int period = config.getInteger(METRICS_PERIOD_CONFIG);
clientId = config.getString(CLIENT_ID_CONFIG);
host = config.getString(METRICS_HOST_CONFIG);
if (host == null || host.isEmpty()) {
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
throw new ConfigException("Failed to get canonical hostname", e);
}
}
executor.scheduleAtFixedRate(new HttpReporter(), period, period, TimeUnit.SECONDS);
log.info("Configured PushHttpMetricsReporter for {} to report every {} seconds", url, period);
}
@Override
public void init(List<KafkaMetric> initMetrics) {
synchronized (lock) {
for (KafkaMetric metric : initMetrics) {
log.debug("Adding metric {}", metric.metricName());
metrics.put(metric.metricName(), metric);
}
}
}
@Override
public void metricChange(KafkaMetric metric) {
synchronized (lock) {
log.debug("Updating metric {}", metric.metricName());
metrics.put(metric.metricName(), metric);
}
}
@Override
public void metricRemoval(KafkaMetric metric) {
synchronized (lock) {
log.debug("Removing metric {}", metric.metricName());
metrics.remove(metric.metricName());
}
}
@Override
public void close() {
executor.shutdown();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new KafkaException("Interrupted when shutting down PushHttpMetricsReporter", e);
}
}
private class HttpReporter implements Runnable {
@Override
public void run() {
long now = time.milliseconds();
final List<MetricValue> samples;
synchronized (lock) {
samples = new ArrayList<>(metrics.size());
for (KafkaMetric metric : metrics.values()) {
MetricName name = metric.metricName();
samples.add(new MetricValue(name.name(), name.group(), name.tags(), metric.metricValue()));
}
}
MetricsReport report = new MetricsReport(new MetricClientInfo(host, clientId, now), samples);
log.trace("Reporting {} metrics to {}", samples.size(), url);
HttpURLConnection connection = null;
try {
connection = newHttpConnection(url);
connection.setRequestMethod("POST");
// connection.getResponseCode() implicitly calls getInputStream, so always set to true.
// On the other hand, leaving this out breaks nothing.
connection.setDoInput(true);
connection.setRequestProperty("Content-Type", "application/json");
byte[] data = json.writeValueAsBytes(report);
connection.setRequestProperty("Content-Length", Integer.toString(data.length));
connection.setRequestProperty("Accept", "*/*");
connection.setUseCaches(false);
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write(data);
os.flush();
}
int responseCode = connection.getResponseCode();
if (responseCode >= 400) {
InputStream is = connection.getErrorStream();
String msg = readResponse(is);
log.error("Error reporting metrics, {}: {}", responseCode, msg);
} else if (responseCode >= 300) {
log.error("PushHttpMetricsReporter does not currently support redirects, saw {}", responseCode);
} else {
log.info("Finished reporting metrics with response code {}", responseCode);
}
} catch (Throwable t) {
log.error("Error reporting metrics", t);
throw new KafkaException("Failed to report current metrics", t);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
}
// Static package-private so unit tests can use a mock connection
static HttpURLConnection newHttpConnection(URL url) throws IOException {
return (HttpURLConnection) url.openConnection();
}
// Static package-private so unit tests can mock reading response
static String readResponse(InputStream is) {
try (Scanner s = new Scanner(is, StandardCharsets.UTF_8.name()).useDelimiter("\\A")) {
return s.hasNext() ? s.next() : "";
}
}
private static class MetricsReport {
private final MetricClientInfo client;
private final Collection<MetricValue> metrics;
MetricsReport(MetricClientInfo client, Collection<MetricValue> metrics) {
this.client = client;
this.metrics = metrics;
}
@JsonProperty
public MetricClientInfo client() {
return client;
}
@JsonProperty
public Collection<MetricValue> metrics() {
return metrics;
}
}
private static class MetricClientInfo {
private final String host;
private final String clientId;
private final long time;
MetricClientInfo(String host, String clientId, long time) {
this.host = host;
this.clientId = clientId;
this.time = time;
}
@JsonProperty
public String host() {
return host;
}
@JsonProperty("client_id")
public String clientId() {
return clientId;
}
@JsonProperty
public long time() {
return time;
}
}
private static class MetricValue {
private final String name;
private final String group;
private final Map<String, String> tags;
private final Object value;
MetricValue(String name, String group, Map<String, String> tags, Object value) {
this.name = name;
this.group = group;
this.tags = tags;
this.value = value;
}
@JsonProperty
public String name() {
return name;
}
@JsonProperty
public String group() {
return group;
}
@JsonProperty
public Map<String, String> tags() {
return tags;
}
@JsonProperty
public Object value() {
return value;
}
}
// The signature for getInt changed from returning int to Integer so to remain compatible with 0.8.2.2 jars
// for system tests we replace it with a custom version that works for all versions.
private static class PushHttpMetricsReporterConfig extends AbstractConfig {
public PushHttpMetricsReporterConfig(ConfigDef definition, Map<?, ?> originals) {
super(definition, originals);
}
public Integer getInteger(String key) {
return (Integer) get(key);
}
}
}