blob: b9c0ed0388427915a8705de2b38b5d4a8a86768b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import java.util.concurrent.TimeUnit;
public enum ProcessGroupStatusDescriptor {
BYTES_READ(
"bytesRead",
"Bytes Read (5 mins)",
"The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes",
Formatter.DATA_SIZE,
ProcessGroupStatus::getBytesRead),
BYTES_WRITTEN(
"bytesWritten",
"Bytes Written (5 mins)",
"The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes",
Formatter.DATA_SIZE,
ProcessGroupStatus::getBytesWritten),
BYTES_TRANSFERRED(
"bytesTransferred",
"Bytes Transferred (5 mins)",
"The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
Formatter.DATA_SIZE,
s -> s.getBytesRead() + s.getBytesWritten()),
INPUT_BYTES("inputBytes",
"Bytes In (5 mins)",
"The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
Formatter.DATA_SIZE,
ProcessGroupStatus::getInputContentSize),
INPUT_COUNT(
"inputCount",
"FlowFiles In (5 mins)",
"The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
Formatter.COUNT,
s -> s.getInputCount().longValue()),
OUTPUT_BYTES(
"outputBytes",
"Bytes Out (5 mins)",
"The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
Formatter.DATA_SIZE,
ProcessGroupStatus::getOutputContentSize),
OUTPUT_COUNT(
"outputCount",
"FlowFiles Out (5 mins)",
"The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
Formatter.COUNT,
s -> s.getOutputCount().longValue()),
QUEUED_BYTES(
"queuedBytes",
"Queued Bytes",
"The cumulative size of all FlowFiles queued in all Connections of this Process Group",
Formatter.DATA_SIZE,
ProcessGroupStatus::getQueuedContentSize),
QUEUED_COUNT(
"queuedCount",
"Queued Count",
"The number of FlowFiles queued in all Connections of this Process Group",
Formatter.COUNT,
s -> s.getQueuedCount().longValue()),
TASK_MILLIS(
"taskMillis",
"Total Task Duration (5 mins)",
"The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
Formatter.DURATION,
ProcessGroupStatusDescriptor::calculateTaskMillis);
private MetricDescriptor<ProcessGroupStatus> descriptor;
ProcessGroupStatusDescriptor(final String field, final String label, final String description,
final MetricDescriptor.Formatter formatter, final ValueMapper<ProcessGroupStatus> valueFunction) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
}
public String getField() {
return descriptor.getField();
}
public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
return descriptor;
}
private static long calculateTaskMillis(final ProcessGroupStatus status) {
return TimeUnit.MILLISECONDS.convert(calculateTaskNanos(status), TimeUnit.NANOSECONDS);
}
private static long calculateTaskNanos(final ProcessGroupStatus status) {
long nanos = 0L;
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
nanos += procStatus.getProcessingNanos();
}
for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
nanos += calculateTaskNanos(childStatus);
}
return nanos;
}
}