blob: 33222c77d4433d8f41f64fdae94b0970e6d37003 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting.prometheus.api;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.RestrictedSSLContextService;
public class PrometheusMetricsUtil {
public static final AllowableValue METRICS_STRATEGY_ROOT = new AllowableValue("Root Process Group", "Root Process Group",
"Send rollup metrics for the entire root process group");
public static final AllowableValue METRICS_STRATEGY_PG = new AllowableValue("All Process Groups", "All Process Groups",
"Send metrics for each process group");
public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new AllowableValue("All Components", "All Components",
"Send metrics for each component in the system, to include processors, connections, controller services, etc.");
private static final CollectorRegistry NIFI_REGISTRY = new CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new CollectorRegistry();
// Common properties/values
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "No Authentication",
"ReportingTask will not authenticate clients. Anyone can communicate with this ReportingTask anonymously");
public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Want Authentication",
"ReportingTask will try to verify the client but if unable to verify will allow the client to communicate anonymously");
public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Need Authentication",
"ReportingTask will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-metrics-endpoint-port")
.displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("9092")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor INSTANCE_ID = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-instance-id")
.displayName("Instance ID")
.description("Id of this NiFi instance to be included in the metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-ssl-context")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("prometheus-reporting-task-client-auth")
.displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
.allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
.defaultValue(CLIENT_NONE.getValue())
.build();
// Processor / Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
.name("nifi_amount_flowfiles_sent")
.help("Total number of FlowFiles sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
.name("nifi_amount_flowfiles_transferred")
.help("Total number of FlowFiles transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
.name("nifi_amount_flowfiles_received")
.help("Total number of FlowFiles received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_REMOVED = Gauge.build()
.name("nifi_amount_flowfiles_removed")
.help("Total number of FlowFiles removed by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
.name("nifi_amount_bytes_sent")
.help("Total number of bytes sent by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
.name("nifi_amount_bytes_read")
.help("Total number of bytes read by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
.name("nifi_amount_bytes_written")
.help("Total number of bytes written by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
.name("nifi_amount_bytes_received")
.help("Total number of bytes received by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
.name("nifi_amount_bytes_transferred")
.help("Total number of Bytes transferred by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
.name("nifi_amount_threads_active")
.help("Total number of threads active for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_THREADS_TOTAL_TERMINATED = Gauge.build()
.name("nifi_amount_threads_terminated")
.help("Total number of threads terminated for the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
.name("nifi_size_content_output_total")
.help("Total size of content output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
.name("nifi_size_content_input_total")
.help("Total size of content input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
.name("nifi_size_content_queued_total")
.help("Total size of content queued in the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
.name("nifi_amount_items_output")
.help("Total number of items output by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
.name("nifi_amount_items_input")
.help("Total number of items input by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
.name("nifi_amount_items_queued")
.help("Total number of items queued by the component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Processor metrics
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
.name("nifi_processor_counters")
.help("Counters exposed by NiFi Processors")
.labelNames("processor_name", "counter_name", "processor_id", "instance")
.register(NIFI_REGISTRY);
// Connection metrics
private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
.name("nifi_backpressure_bytes_threshold")
.help("The number of bytes that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
.name("nifi_backpressure_object_threshold")
.help("The number of flow files that can be queued before backpressure is applied")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
.name("nifi_backpressure_enabled")
.help("Whether backpressure has been applied for this component. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
// Port metrics
private static final Gauge IS_TRANSMITTING = Gauge.build()
.name("nifi_transmitting")
.help("Whether this component is transmitting data. Values are 0 or 1")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id", "run_status")
.register(NIFI_REGISTRY);
// Remote Process Group (RPG) metrics
private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
.name("nifi_active_remote_port_count")
.help("The number of active remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
.name("nifi_inactive_remote_port_count")
.help("The number of inactive remote ports associated with this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
.name("nifi_average_lineage_duration")
.help("The average lineage duration (in milliseconds) for all flow file processed by this component")
.labelNames("instance", "component_type", "component_name", "component_id", "parent_id",
"source_id", "source_name", "destination_id", "destination_name")
.register(NIFI_REGISTRY);
///////////////////////////////////////////////////////////////
// JVM Metrics
///////////////////////////////////////////////////////////////
private static final Gauge JVM_HEAP_USED = Gauge.build()
.name("nifi_jvm_heap_used")
.help("NiFi JVM heap used")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_HEAP_USAGE = Gauge.build()
.name("nifi_jvm_heap_usage")
.help("NiFi JVM heap usage")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_HEAP_NON_USAGE = Gauge.build()
.name("nifi_jvm_heap_non_usage")
.help("NiFi JVM heap non usage")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_THREAD_COUNT = Gauge.build()
.name("nifi_jvm_thread_count")
.help("NiFi JVM thread count")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_DAEMON_THREAD_COUNT = Gauge.build()
.name("nifi_jvm_daemon_thread_count")
.help("NiFi JVM daemon thread count")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_UPTIME = Gauge.build()
.name("nifi_jvm_uptime")
.help("NiFi JVM uptime")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_FILE_DESCRIPTOR_USAGE = Gauge.build()
.name("nifi_jvm_file_descriptor_usage")
.help("NiFi JVM file descriptor usage")
.labelNames("instance")
.register(JVM_REGISTRY);
private static final Gauge JVM_GC_RUNS = Gauge.build()
.name("nifi_jvm_gc_runs")
.help("NiFi JVM GC number of runs")
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
private static final Gauge JVM_GC_TIME = Gauge.build()
.name("nifi_jvm_gc_time")
.help("NiFi JVM GC time in milliseconds")
.labelNames("instance", "gc_name")
.register(JVM_REGISTRY);
public static CollectorRegistry createNifiMetrics(ProcessGroupStatus status, String instanceId, String parentPGId, String componentType, String metricsStrategy) {
final String componentId = status.getId();
final String componentName = status.getName();
// Clear all collectors to deal with removed/renamed components -- for root PG only
if("RootProcessGroup".equals(componentType)) {
try {
for (final Field field : PrometheusMetricsUtil.class.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers()) && (field.get(null) instanceof SimpleCollector)) {
SimpleCollector<?> sc = (SimpleCollector<?>) (field.get(null));
sc.clear();
}
}
} catch (IllegalAccessException e) {
// ignore
}
}
AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesSent());
AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesReceived());
AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType, componentName, componentId, parentPGId).set(status.getBytesTransferred());
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getOutputContentSize());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getInputContentSize());
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getQueuedContentSize());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName, componentId, parentPGId, "", "", "", "")
.set(status.getInputCount());
AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName, componentId, parentPGId,"", "", "", "")
.set(status.getQueuedCount());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType, componentName, componentId, parentPGId)
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, componentType, componentName, componentId, parentPGId)
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
// Report metrics for child process groups if specified
if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) || METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
status.getProcessGroupStatus().forEach((childGroupStatus) -> createNifiMetrics(childGroupStatus, instanceId, componentId, "ProcessGroup", metricsStrategy));
}
if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
// Report metrics for all components
for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
Map<String, Long> counters = processorStatus.getCounters();
if (counters != null) {
counters.entrySet().stream().forEach(entry -> PROCESSOR_COUNTERS
.labels(processorStatus.getName(), entry.getKey(), processorStatus.getId(), instanceId).set(entry.getValue()));
}
final String procComponentType = "Processor";
final String procComponentId = processorStatus.getId();
final String procComponentName = processorStatus.getName();
final String parentId = processorStatus.getGroupId();
AMOUNT_FLOWFILES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesReceived());
AMOUNT_FLOWFILES_REMOVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getFlowFilesRemoved());
AMOUNT_BYTES_SENT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesRead());
AMOUNT_BYTES_WRITTEN.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesWritten());
AMOUNT_BYTES_RECEIVED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId).set(processorStatus.getBytesReceived());
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getOutputBytes());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getInputBytes());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getInputCount());
AVERAGE_LINEAGE_DURATION.labels(instanceId, procComponentType, procComponentName, procComponentId, parentPGId, "", "", "", "")
.set(processorStatus.getAverageLineageDuration());
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
.set(status.getActiveThreadCount() == null ? 0 : status.getActiveThreadCount());
AMOUNT_THREADS_TOTAL_TERMINATED.labels(instanceId, procComponentType, procComponentName, procComponentId, parentId)
.set(status.getTerminatedThreadCount() == null ? 0 : status.getTerminatedThreadCount());
}
for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
final String connComponentId = connectionStatus.getId();
final String connComponentName = connectionStatus.getName();
final String sourceId = connectionStatus.getSourceId();
final String sourceName = connectionStatus.getSourceName();
final String destinationId = connectionStatus.getDestinationId();
final String destinationName = connectionStatus.getDestinationName();
final String parentId = connectionStatus.getGroupId();
final String connComponentType = "Connection";
SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getOutputBytes());
SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getInputBytes());
SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getQueuedBytes());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getInputCount());
AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getQueuedCount());
BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getBackPressureBytesThreshold());
BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(connectionStatus.getBackPressureObjectThreshold());
boolean isBackpressureEnabled = (connectionStatus.getBackPressureObjectThreshold() > 0 && connectionStatus.getBackPressureObjectThreshold() <= connectionStatus.getQueuedCount())
|| (connectionStatus.getBackPressureBytesThreshold() > 0 && connectionStatus.getBackPressureBytesThreshold() <= connectionStatus.getMaxQueuedBytes());
IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType, connComponentName, connComponentId, parentId, sourceId, sourceName, destinationId, destinationName)
.set(isBackpressureEnabled ? 1 : 0);
}
for (PortStatus portStatus : status.getInputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
final String portComponentType = "InputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getInputCount());
final Boolean isTransmitting = portStatus.isTransmitting();
IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
.set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for (PortStatus portStatus : status.getOutputPortStatus()) {
final String portComponentId = portStatus.getId();
final String portComponentName = portStatus.getName();
final String parentId = portStatus.getGroupId();
final String portComponentType = "OutputPort";
AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesSent());
AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getFlowFilesReceived());
AMOUNT_BYTES_SENT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
AMOUNT_BYTES_READ.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getBytesReceived());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getOutputCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, "", "", "", "")
.set(portStatus.getInputCount());
final Boolean isTransmitting = portStatus.isTransmitting();
IS_TRANSMITTING.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
.set(isTransmitting == null ? 0 : (isTransmitting ? 1 : 0));
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, portComponentType, portComponentName, portComponentId, parentId).set(portStatus.getActiveThreadCount());
}
for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
final String rpgComponentId = remoteProcessGroupStatus.getId();
final String rpgComponentName = remoteProcessGroupStatus.getName();
final String parentId = remoteProcessGroupStatus.getGroupId();
final String rpgComponentType = "RemoteProcessGroup";
AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getSentContentSize());
AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getSentCount());
AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getReceivedCount());
ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getActiveRemotePortCount());
INACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getInactiveRemotePortCount());
AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, "", "", "", "")
.set(remoteProcessGroupStatus.getAverageLineageDuration());
IS_TRANSMITTING.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId, remoteProcessGroupStatus.getTransmissionStatus().name())
.set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus()) ? 1 : 0);
AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, rpgComponentType, rpgComponentName, rpgComponentId, parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
}
}
return NIFI_REGISTRY;
}
public static CollectorRegistry createJvmMetrics(JvmMetrics jvmMetrics, String instanceId) {
JVM_HEAP_USED.labels(instanceId).set(jvmMetrics.heapUsed(DataUnit.B));
JVM_HEAP_USAGE.labels(instanceId).set(jvmMetrics.heapUsage());
JVM_HEAP_NON_USAGE.labels(instanceId).set(jvmMetrics.nonHeapUsage());
JVM_THREAD_COUNT.labels(instanceId).set(jvmMetrics.threadCount());
JVM_DAEMON_THREAD_COUNT.labels(instanceId).set(jvmMetrics.daemonThreadCount());
JVM_UPTIME.labels(instanceId).set(jvmMetrics.uptime());
JVM_FILE_DESCRIPTOR_USAGE.labels(instanceId).set(jvmMetrics.fileDescriptorUsage());
jvmMetrics.garbageCollectors()
.forEach((name, stat) -> {
JVM_GC_RUNS.labels(instanceId, name).set(stat.getRuns());
JVM_GC_TIME.labels(instanceId, name).set(stat.getTime(TimeUnit.MILLISECONDS));
});
return JVM_REGISTRY;
}
}