blob: 19d047fd29cbce0431f2094b495f483041581bb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.prometheus;
import com.google.common.collect.ImmutableMap;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.exporter.PushGateway;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
/**
*
*/
public class PrometheusEmitter implements Emitter
{
private static final Logger log = new Logger(PrometheusEmitter.class);
private final Metrics metrics;
private final PrometheusEmitterConfig config;
private final PrometheusEmitterConfig.Strategy strategy;
private static final Pattern PATTERN = Pattern.compile("[^a-zA-Z0-9_][^a-zA-Z0-9_]*");
private HTTPServer server;
private PushGateway pushGateway;
private String identifier;
static PrometheusEmitter of(PrometheusEmitterConfig config)
{
return new PrometheusEmitter(config);
}
public PrometheusEmitter(PrometheusEmitterConfig config)
{
this.config = config;
this.strategy = config.getStrategy();
metrics = new Metrics(config.getNamespace(), config.getDimensionMapPath());
}
@Override
public void start()
{
if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) {
if (server == null) {
try {
server = new HTTPServer(config.getPort());
}
catch (IOException e) {
log.error(e, "Unable to start prometheus HTTPServer");
}
} else {
log.error("HTTPServer is already started");
}
} else if (strategy.equals(PrometheusEmitterConfig.Strategy.pushgateway)) {
pushGateway = new PushGateway(config.getPushGatewayAddress());
}
}
@Override
public void emit(Event event)
{
if (event instanceof ServiceMetricEvent) {
emitMetric((ServiceMetricEvent) event);
}
}
private void emitMetric(ServiceMetricEvent metricEvent)
{
String name = metricEvent.getMetric();
String service = metricEvent.getService();
Map<String, Object> userDims = metricEvent.getUserDims();
identifier = (userDims.get("task") == null ? metricEvent.getHost() : (String) userDims.get("task"));
Number value = metricEvent.getValue();
DimensionsAndCollector metric = metrics.getByName(name, service);
if (metric != null) {
String[] labelValues = new String[metric.getDimensions().length];
String[] labelNames = metric.getDimensions();
for (int i = 0; i < labelValues.length; i++) {
String labelName = labelNames[i];
//labelName is controlled by the user. Instead of potential NPE on invalid labelName we use "unknown" as the dimension value
Object userDim = userDims.get(labelName);
labelValues[i] = userDim != null ? PATTERN.matcher(userDim.toString()).replaceAll("_") : "unknown";
}
if (metric.getCollector() instanceof Counter) {
((Counter) metric.getCollector()).labels(labelValues).inc(value.doubleValue());
} else if (metric.getCollector() instanceof Gauge) {
((Gauge) metric.getCollector()).labels(labelValues).set(value.doubleValue());
} else if (metric.getCollector() instanceof Histogram) {
((Histogram) metric.getCollector()).labels(labelValues).observe(value.doubleValue() / metric.getConversionFactor());
} else {
log.error("Unrecognized metric type [%s]", metric.getCollector().getClass());
}
} else {
log.debug("Unmapped metric [%s]", name);
}
}
private void pushMetric()
{
Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
try {
for (DimensionsAndCollector collector : map.values()) {
if (config.getNamespace() != null) {
pushGateway.push(collector.getCollector(), config.getNamespace(), ImmutableMap.of(config.getNamespace(), identifier));
}
}
}
catch (IOException e) {
log.error(e, "Unable to push prometheus metrics to pushGateway");
}
}
@Override
public void flush()
{
if (pushGateway != null) {
pushMetric();
}
}
@Override
public void close()
{
if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) {
if (server != null) {
server.stop();
}
} else {
flush();
}
}
public HTTPServer getServer()
{
return server;
}
public PushGateway getPushGateway()
{
return pushGateway;
}
public void setPushGateway(PushGateway pushGateway)
{
this.pushGateway = pushGateway;
}
}