blob: bcafdd93e9be782801978f4bb4073ada71144c77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.metrics.prometheus;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.MetricsServlet;
import io.prometheus.client.hotspot.DefaultExports;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Enumeration;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.zookeeper.metrics.Counter;
import org.apache.zookeeper.metrics.Gauge;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.metrics.MetricsProvider;
import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
import org.apache.zookeeper.metrics.Summary;
import org.apache.zookeeper.metrics.SummarySet;
import org.apache.zookeeper.server.RateLogger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Metrics Provider implementation based on https://prometheus.io.
*
* @since 3.6.0
*/
public class PrometheusMetricsProvider implements MetricsProvider {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsProvider.class);
private static final String LABEL = "key";
private static final String[] LABELS = {LABEL};
/**
* Number of worker threads for reporting Prometheus summary metrics.
* Default value is 1.
* If the number is less than 1, the main thread will be used.
*/
static final String NUM_WORKER_THREADS = "numWorkerThreads";
/**
* The max queue size for Prometheus summary metrics reporting task.
* Default value is 1000000.
*/
static final String MAX_QUEUE_SIZE = "maxQueueSize";
/**
* The timeout in ms for Prometheus worker threads shutdown.
* Default value is 1000ms.
*/
static final String WORKER_SHUTDOWN_TIMEOUT_MS = "workerShutdownTimeoutMs";
/**
* We are using the 'defaultRegistry'.
* <p>
* When you are running ZooKeeper (server or client) together with other
* libraries every metrics will be expected as a single view.
* </p>
*/
private final CollectorRegistry collectorRegistry = CollectorRegistry.defaultRegistry;
private final RateLogger rateLogger = new RateLogger(LOG, 60 * 1000);
private String host = "0.0.0.0";
private int port = 7000;
private boolean exportJvmInfo = true;
private Server server;
private final MetricsServletImpl servlet = new MetricsServletImpl();
private final Context rootContext = new Context();
private int numWorkerThreads = 1;
private int maxQueueSize = 1000000;
private long workerShutdownTimeoutMs = 1000;
private Optional<ExecutorService> executorOptional = Optional.empty();
@Override
public void configure(Properties configuration) throws MetricsProviderLifeCycleException {
LOG.info("Initializing metrics, configuration: {}", configuration);
this.host = configuration.getProperty("httpHost", "0.0.0.0");
this.port = Integer.parseInt(configuration.getProperty("httpPort", "7000"));
this.exportJvmInfo = Boolean.parseBoolean(configuration.getProperty("exportJvmInfo", "true"));
this.numWorkerThreads = Integer.parseInt(
configuration.getProperty(NUM_WORKER_THREADS, "1"));
this.maxQueueSize = Integer.parseInt(
configuration.getProperty(MAX_QUEUE_SIZE, "1000000"));
this.workerShutdownTimeoutMs = Long.parseLong(
configuration.getProperty(WORKER_SHUTDOWN_TIMEOUT_MS, "1000"));
}
@Override
public void start() throws MetricsProviderLifeCycleException {
this.executorOptional = createExecutor();
try {
LOG.info("Starting /metrics HTTP endpoint at host: {}, port: {}, exportJvmInfo: {}",
host, port, exportJvmInfo);
if (exportJvmInfo) {
DefaultExports.initialize();
}
server = new Server(new InetSocketAddress(host, port));
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);
context.addServlet(new ServletHolder(servlet), "/metrics");
server.start();
} catch (Exception err) {
LOG.error("Cannot start /metrics server", err);
if (server != null) {
try {
server.stop();
} catch (Exception suppressed) {
err.addSuppressed(suppressed);
} finally {
server = null;
}
}
throw new MetricsProviderLifeCycleException(err);
}
}
// for tests
MetricsServletImpl getServlet() {
return servlet;
}
@Override
public MetricsContext getRootContext() {
return rootContext;
}
@Override
public void stop() {
shutdownExecutor();
if (server != null) {
try {
server.stop();
} catch (Exception err) {
LOG.error("Cannot safely stop Jetty server", err);
} finally {
server = null;
}
}
}
/**
* Dump all values to the 4lw interface and to the Admin server.
* <p>
* This method is not expected to be used to serve metrics to Prometheus. We
* are using the MetricsServlet provided by Prometheus for that, leaving the
* real representation to the Prometheus Java client.
* </p>
*
* @param sink the receiver of data (4lw interface, Admin server or tests)
*/
@Override
public void dump(BiConsumer<String, Object> sink) {
sampleGauges();
Enumeration<Collector.MetricFamilySamples> samplesFamilies = collectorRegistry.metricFamilySamples();
while (samplesFamilies.hasMoreElements()) {
Collector.MetricFamilySamples samples = samplesFamilies.nextElement();
samples.samples.forEach(sample -> {
String key = buildKeyForDump(sample);
sink.accept(key, sample.value);
});
}
}
private static String buildKeyForDump(Collector.MetricFamilySamples.Sample sample) {
StringBuilder keyBuilder = new StringBuilder();
keyBuilder.append(sample.name);
if (sample.labelNames.size() > 0) {
keyBuilder.append('{');
for (int i = 0; i < sample.labelNames.size(); ++i) {
if (i > 0) {
keyBuilder.append(',');
}
keyBuilder.append(sample.labelNames.get(i));
keyBuilder.append("=\"");
keyBuilder.append(sample.labelValues.get(i));
keyBuilder.append('"');
}
keyBuilder.append('}');
}
return keyBuilder.toString();
}
/**
* Update Gauges. In ZooKeeper Metrics API Gauges are callbacks served by
* internal components and the value is not held by Prometheus structures.
*/
private void sampleGauges() {
rootContext.gauges.values()
.forEach(PrometheusGaugeWrapper::sample);
}
@Override
public void resetAllValues() {
// not supported on Prometheus
}
private class Context implements MetricsContext {
private final ConcurrentMap<String, PrometheusGaugeWrapper> gauges = new ConcurrentHashMap<>();
private final ConcurrentMap<String, PrometheusCounter> counters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, PrometheusSummary> basicSummaries = new ConcurrentHashMap<>();
private final ConcurrentMap<String, PrometheusSummary> summaries = new ConcurrentHashMap<>();
private final ConcurrentMap<String, PrometheusLabelledSummary> basicSummarySets = new ConcurrentHashMap<>();
private final ConcurrentMap<String, PrometheusLabelledSummary> summarySets = new ConcurrentHashMap<>();
@Override
public MetricsContext getContext(String name) {
// no hierarchy yet
return this;
}
@Override
public Counter getCounter(String name) {
return counters.computeIfAbsent(name, PrometheusCounter::new);
}
/**
* Gauges may go up and down, in ZooKeeper they are a way to export
* internal values with a callback.
*
* @param name the name of the gauge
* @param gauge the callback
*/
@Override
public void registerGauge(String name, Gauge gauge) {
Objects.requireNonNull(name);
gauges.compute(name, (id, prev) ->
new PrometheusGaugeWrapper(id, gauge, prev != null ? prev.inner : null));
}
@Override
public void unregisterGauge(String name) {
PrometheusGaugeWrapper existing = gauges.remove(name);
if (existing != null) {
existing.unregister();
}
}
@Override
public Summary getSummary(String name, DetailLevel detailLevel) {
if (detailLevel == DetailLevel.BASIC) {
return basicSummaries.computeIfAbsent(name, (n) -> {
if (summaries.containsKey(n)) {
throw new IllegalArgumentException("Already registered a non basic summary as " + n);
}
return new PrometheusSummary(name, detailLevel);
});
} else {
return summaries.computeIfAbsent(name, (n) -> {
if (basicSummaries.containsKey(n)) {
throw new IllegalArgumentException("Already registered a basic summary as " + n);
}
return new PrometheusSummary(name, detailLevel);
});
}
}
@Override
public SummarySet getSummarySet(String name, DetailLevel detailLevel) {
if (detailLevel == DetailLevel.BASIC) {
return basicSummarySets.computeIfAbsent(name, (n) -> {
if (summarySets.containsKey(n)) {
throw new IllegalArgumentException("Already registered a non basic summary set as " + n);
}
return new PrometheusLabelledSummary(name, detailLevel);
});
} else {
return summarySets.computeIfAbsent(name, (n) -> {
if (basicSummarySets.containsKey(n)) {
throw new IllegalArgumentException("Already registered a basic summary set as " + n);
}
return new PrometheusLabelledSummary(name, detailLevel);
});
}
}
}
private class PrometheusCounter implements Counter {
private final io.prometheus.client.Counter inner;
private final String name;
public PrometheusCounter(String name) {
this.name = name;
this.inner = io.prometheus.client.Counter
.build(name, name)
.register(collectorRegistry);
}
@Override
public void add(long delta) {
try {
inner.inc(delta);
} catch (IllegalArgumentException err) {
LOG.error("invalid delta {} for metric {}", delta, name, err);
}
}
@Override
public long get() {
// this method is used only for tests
// Prometheus returns a "double"
// it is safe to fine to a long
// we are never setting non-integer values
return (long) inner.get();
}
}
private class PrometheusGaugeWrapper {
private final io.prometheus.client.Gauge inner;
private final Gauge gauge;
private final String name;
public PrometheusGaugeWrapper(String name, Gauge gauge, io.prometheus.client.Gauge prev) {
this.name = name;
this.gauge = gauge;
this.inner = prev != null ? prev
: io.prometheus.client.Gauge
.build(name, name)
.register(collectorRegistry);
}
/**
* Call the callack and update Prometheus Gauge. This method is called
* when the server is polling for a value.
*/
private void sample() {
Number value = gauge.get();
this.inner.set(value != null ? value.doubleValue() : 0);
}
private void unregister() {
collectorRegistry.unregister(inner);
}
}
class PrometheusSummary implements Summary {
private final io.prometheus.client.Summary inner;
private final String name;
public PrometheusSummary(String name, MetricsContext.DetailLevel level) {
this.name = name;
if (level == MetricsContext.DetailLevel.ADVANCED) {
this.inner = io.prometheus.client.Summary
.build(name, name)
.quantile(0.5, 0.05) // Add 50th percentile (= median) with 5% tolerated error
.quantile(0.9, 0.01) // Add 90th percentile with 1% tolerated error
.quantile(0.99, 0.001) // Add 99th percentile with 0.1% tolerated error
.register(collectorRegistry);
} else {
this.inner = io.prometheus.client.Summary
.build(name, name)
.quantile(0.5, 0.05) // Add 50th percentile (= median) with 5% tolerated error
.register(collectorRegistry);
}
}
@Override
public void add(long delta) {
reportMetrics(() -> observe(delta));
}
void observe(final long delta) {
try {
inner.observe(delta);
} catch (final IllegalArgumentException err) {
LOG.error("invalid delta {} for metric {}", delta, name, err);
}
}
}
class PrometheusLabelledSummary implements SummarySet {
private final io.prometheus.client.Summary inner;
private final String name;
public PrometheusLabelledSummary(String name, MetricsContext.DetailLevel level) {
this.name = name;
if (level == MetricsContext.DetailLevel.ADVANCED) {
this.inner = io.prometheus.client.Summary
.build(name, name)
.labelNames(LABELS)
.quantile(0.5, 0.05) // Add 50th percentile (= median) with 5% tolerated error
.quantile(0.9, 0.01) // Add 90th percentile with 1% tolerated error
.quantile(0.99, 0.001) // Add 99th percentile with 0.1% tolerated error
.register(collectorRegistry);
} else {
this.inner = io.prometheus.client.Summary
.build(name, name)
.labelNames(LABELS)
.quantile(0.5, 0.05) // Add 50th percentile (= median) with 5% tolerated error
.register(collectorRegistry);
}
}
@Override
public void add(String key, long value) {
reportMetrics(() -> observe(key, value));
}
void observe(final String key, final long value) {
try {
inner.labels(key).observe(value);
} catch (final IllegalArgumentException err) {
LOG.error("invalid value {} for metric {} with key {}", value, name, key, err);
}
}
}
class MetricsServletImpl extends MetricsServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// little trick: update the Gauges before serving data
// from Prometheus CollectorRegistry
sampleGauges();
// serve data using Prometheus built in client.
super.doGet(req, resp);
}
}
private Optional<ExecutorService> createExecutor() {
if (numWorkerThreads < 1) {
LOG.info("Executor service was not created as numWorkerThreads {} is less than 1", numWorkerThreads);
return Optional.empty();
}
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(maxQueueSize);
final ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads,
numWorkerThreads,
0L,
TimeUnit.MILLISECONDS,
queue, new PrometheusWorkerThreadFactory());
LOG.info("Executor service was created with numWorkerThreads {} and maxQueueSize {}",
numWorkerThreads,
maxQueueSize);
return Optional.of(executor);
}
private void shutdownExecutor() {
if (executorOptional.isPresent()) {
LOG.info("Shutdown executor service with timeout {}", workerShutdownTimeoutMs);
final ExecutorService executor = executorOptional.get();
executor.shutdown();
try {
if (!executor.awaitTermination(workerShutdownTimeoutMs, TimeUnit.MILLISECONDS)) {
LOG.error("Not all the Prometheus worker threads terminated properly after {} timeout",
workerShutdownTimeoutMs);
executor.shutdownNow();
}
} catch (final Exception e) {
LOG.error("Error occurred while terminating Prometheus worker threads", e);
executor.shutdownNow();
}
}
}
private static class PrometheusWorkerThreadFactory implements ThreadFactory {
private static final AtomicInteger workerCounter = new AtomicInteger(1);
@Override
public Thread newThread(final Runnable runnable) {
final String threadName = "PrometheusMetricsProviderWorker-" + workerCounter.getAndIncrement();
final Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true);
return thread;
}
}
private void reportMetrics(final Runnable task) {
if (executorOptional.isPresent()) {
try {
executorOptional.get().submit(task);
} catch (final RejectedExecutionException e) {
rateLogger.rateLimitLog("Prometheus metrics reporting task queue size exceeded the max",
String.valueOf(maxQueueSize));
}
} else {
task.run();
}
}
}