blob: b31ea3fbc2deba15c5725661d137e481caed7576 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus;
import io.prometheus.client.CollectorRegistry;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.eclipse.jetty.server.Server;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
import static org.apache.nifi.prometheus.util.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
@CapabilityDescription("Reports metrics in Prometheus format by creating a /metrics HTTP(S) endpoint which can be used for external monitoring of the application."
+ " The reporting task reports a set of metrics regarding the JVM (optional) and the NiFi instance. Note that if the underlying Jetty server (i.e. the "
+ "Prometheus endpoint) cannot be started (for example if two PrometheusReportingTask instances are started on the same port), this may cause a delay in "
+ "shutting down NiFi while it waits for the server resources to be cleaned up.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "60 sec")
public class PrometheusReportingTask extends AbstractReportingTask {
private PrometheusServer prometheusServer;
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor METRICS_STRATEGY = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-strategy")
.displayName("Metrics Reporting Strategy")
.description("The granularity on which to report metrics. Options include only the root process group, all process groups, or all components")
.allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG, METRICS_STRATEGY_COMPONENTS)
.defaultValue(METRICS_STRATEGY_COMPONENTS.getValue())
.required(true)
.build();
public static final PropertyDescriptor SEND_JVM_METRICS = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-send-jvm")
.displayName("Send JVM metrics")
.description("Send JVM metrics in addition to the NiFi metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT);
props.add(PrometheusMetricsUtil.INSTANCE_ID);
props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
props.add(SSL_CONTEXT);
props.add(PrometheusMetricsUtil.CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnScheduled
public void onScheduled(final ConfigurationContext context) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String metricsEndpointPort = context.getProperty(PrometheusMetricsUtil.METRICS_ENDPOINT_PORT).evaluateAttributeExpressions().getValue();
try {
List<Function<ReportingContext, CollectorRegistry>> metricsCollectors = new ArrayList<>();
if (sslContextService == null) {
this.prometheusServer = new PrometheusServer(new InetSocketAddress(Integer.parseInt(metricsEndpointPort)), getLogger());
} else {
final String clientAuthValue = context.getProperty(PrometheusMetricsUtil.CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (PrometheusMetricsUtil.CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true;
want = false;
} else if (PrometheusMetricsUtil.CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
this.prometheusServer = new PrometheusServer(Integer.parseInt(metricsEndpointPort), sslContextService, getLogger(), need, want);
}
Function<ReportingContext, CollectorRegistry> nifiMetrics = (reportingContext) -> {
EventAccess eventAccess = reportingContext.getEventAccess();
ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
if (instanceId == null) {
instanceId = "";
}
String metricsStrategy = reportingContext.getProperty(METRICS_STRATEGY).getValue();
NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
CollectorRegistry collectorRegistry = PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootGroupStatus, instanceId, "", "RootProcessGroup", metricsStrategy);
// Add the total byte counts (read/written) to the NiFi metrics registry
final String rootPGId = StringUtils.isEmpty(rootGroupStatus.getId()) ? "" : rootGroupStatus.getId();
final String rootPGName = StringUtils.isEmpty(rootGroupStatus.getName()) ? "" : rootGroupStatus.getName();
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesRead(), "TOTAL_BYTES_READ",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesWritten(), "TOTAL_BYTES_WRITTEN",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesSent(), "TOTAL_BYTES_SENT",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
nifiMetricsRegistry.setDataPoint(eventAccess.getTotalBytesReceived(), "TOTAL_BYTES_RECEIVED",
instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
return collectorRegistry;
};
metricsCollectors.add(nifiMetrics);
if (context.getProperty(SEND_JVM_METRICS).asBoolean()) {
Function<ReportingContext, CollectorRegistry> jvmMetrics = (reportingContext) -> {
String instanceId = reportingContext.getProperty(PrometheusMetricsUtil.INSTANCE_ID).evaluateAttributeExpressions().getValue();
JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
return PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);
};
metricsCollectors.add(jvmMetrics);
}
this.prometheusServer.setMetricsCollectors(metricsCollectors);
getLogger().info("Started Jetty server");
} catch (Exception e) {
// Don't allow this to finish successfully, onTrigger should not be called if the Jetty server wasn't started
throw new ProcessException("Failed to start Jetty server", e);
}
}
@OnStopped
public void OnStopped() throws Exception {
if (prometheusServer != null) {
Server server = prometheusServer.getServer();
if (server != null) {
server.stop();
}
}
}
@OnShutdown
public void onShutDown() throws Exception {
if (prometheusServer != null) {
Server server = prometheusServer.getServer();
if (server != null) {
server.stop();
}
}
}
@Override
public void onTrigger(final ReportingContext context) {
if (prometheusServer != null) {
prometheusServer.setReportingContext(context);
}
}
}