| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.reporting; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.components.validation.ValidationStatus; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.connectable.Funnel; |
| import org.apache.nifi.connectable.Port; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.ProcessorNode; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.flow.FlowManager; |
| import org.apache.nifi.controller.queue.QueueSize; |
| import org.apache.nifi.controller.repository.FlowFileEvent; |
| import org.apache.nifi.controller.repository.FlowFileEventRepository; |
| import org.apache.nifi.controller.repository.RepositoryStatusReport; |
| import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent; |
| import org.apache.nifi.controller.status.ConnectionStatus; |
| import org.apache.nifi.controller.status.PortStatus; |
| import org.apache.nifi.controller.status.ProcessGroupStatus; |
| import org.apache.nifi.controller.status.ProcessorStatus; |
| import org.apache.nifi.controller.status.RemoteProcessGroupStatus; |
| import org.apache.nifi.controller.status.RunStatus; |
| import org.apache.nifi.controller.status.TransmissionStatus; |
| import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions; |
| import org.apache.nifi.controller.status.analytics.StatusAnalytics; |
| import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.groups.RemoteProcessGroup; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.provenance.ProvenanceEventRecord; |
| import org.apache.nifi.registry.flow.VersionControlInformation; |
| import org.apache.nifi.registry.flow.VersionedFlowState; |
| import org.apache.nifi.registry.flow.VersionedFlowStatus; |
| import org.apache.nifi.remote.PublicPort; |
| import org.apache.nifi.remote.RemoteGroupPort; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Predicate; |
| |
| public abstract class AbstractEventAccess implements EventAccess { |
| private static final Logger logger = LoggerFactory.getLogger(AbstractEventAccess.class); |
| |
| private final ProcessScheduler processScheduler; |
| private final StatusAnalyticsEngine statusAnalyticsEngine; |
| private final FlowManager flowManager; |
| private final FlowFileEventRepository flowFileEventRepository; |
| |
| public AbstractEventAccess(final ProcessScheduler processScheduler, final StatusAnalyticsEngine analyticsEngine, final FlowManager flowManager, |
| final FlowFileEventRepository flowFileEventRepository) { |
| this.processScheduler = processScheduler; |
| this.statusAnalyticsEngine = analyticsEngine; |
| this.flowManager = flowManager; |
| this.flowFileEventRepository = flowFileEventRepository; |
| } |
| |
| /** |
| * Returns the status of all components in the specified group. This request |
| * is not in the context of a user so the results will be unfiltered. |
| * |
| * @param groupId group id |
| * @return the component status |
| */ |
| @Override |
| public ProcessGroupStatus getGroupStatus(final String groupId) { |
| final RepositoryStatusReport repoStatusReport = generateRepositoryStatusReport(); |
| return getGroupStatus(groupId, repoStatusReport); |
| } |
| |
| /** |
| * Returns the status for the components in the specified group with the |
| * specified report. This request is not in the context of a user so the |
| * results will be unfiltered. |
| * |
| * @param groupId group id |
| * @param statusReport report |
| * @return the component status |
| */ |
| public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) { |
| final ProcessGroup group = flowManager.getGroup(groupId); |
| |
| // this was invoked with no user context so the results will be unfiltered... necessary for aggregating status history |
| return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1); |
| } |
| |
| protected RepositoryStatusReport generateRepositoryStatusReport() { |
| return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis()); |
| } |
| |
| |
| /** |
| * Returns the status for the components in the specified group with the |
| * specified report. The results will be filtered by executing the specified |
| * predicate. |
| * |
| * @param group group id |
| * @param statusReport report |
| * @param isAuthorized is authorized check |
| * @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus |
| * @param currentDepth the current number of levels deep that we have recursed |
| * @return the component status |
| */ |
| ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized, |
| final int recursiveStatusDepth, final int currentDepth) { |
| if (group == null) { |
| return null; |
| } |
| |
| final ProcessGroupStatus status = new ProcessGroupStatus(); |
| status.setId(group.getIdentifier()); |
| status.setName(isAuthorized.test(group) ? group.getName() : group.getIdentifier()); |
| int activeGroupThreads = 0; |
| int terminatedGroupThreads = 0; |
| long bytesRead = 0L; |
| long bytesWritten = 0L; |
| int queuedCount = 0; |
| long queuedContentSize = 0L; |
| int flowFilesIn = 0; |
| long bytesIn = 0L; |
| int flowFilesOut = 0; |
| long bytesOut = 0L; |
| int flowFilesReceived = 0; |
| long bytesReceived = 0L; |
| int flowFilesSent = 0; |
| long bytesSent = 0L; |
| int flowFilesTransferred = 0; |
| long bytesTransferred = 0; |
| |
| final boolean populateChildStatuses = currentDepth <= recursiveStatusDepth; |
| |
| // set status for processors |
| final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>(); |
| status.setProcessorStatus(processorStatusCollection); |
| for (final ProcessorNode procNode : group.getProcessors()) { |
| final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode, isAuthorized); |
| if (populateChildStatuses) { |
| processorStatusCollection.add(procStat); |
| } |
| activeGroupThreads += procStat.getActiveThreadCount(); |
| terminatedGroupThreads += procStat.getTerminatedThreadCount(); |
| bytesRead += procStat.getBytesRead(); |
| bytesWritten += procStat.getBytesWritten(); |
| |
| flowFilesReceived += procStat.getFlowFilesReceived(); |
| bytesReceived += procStat.getBytesReceived(); |
| flowFilesSent += procStat.getFlowFilesSent(); |
| bytesSent += procStat.getBytesSent(); |
| } |
| |
| // set status for local child groups |
| final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>(); |
| status.setProcessGroupStatus(localChildGroupStatusCollection); |
| for (final ProcessGroup childGroup : group.getProcessGroups()) { |
| final ProcessGroupStatus childGroupStatus; |
| if (populateChildStatuses) { |
| childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized, recursiveStatusDepth, currentDepth + 1); |
| localChildGroupStatusCollection.add(childGroupStatus); |
| } else { |
| // In this case, we don't want to include any of the recursive components' individual statuses. As a result, we can |
| // avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate |
| // that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating |
| // things that we would otherwise need to calculate if the user were in fact authorized. |
| childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1); |
| } |
| |
| activeGroupThreads += childGroupStatus.getActiveThreadCount(); |
| terminatedGroupThreads += childGroupStatus.getTerminatedThreadCount(); |
| bytesRead += childGroupStatus.getBytesRead(); |
| bytesWritten += childGroupStatus.getBytesWritten(); |
| queuedCount += childGroupStatus.getQueuedCount(); |
| queuedContentSize += childGroupStatus.getQueuedContentSize(); |
| |
| flowFilesReceived += childGroupStatus.getFlowFilesReceived(); |
| bytesReceived += childGroupStatus.getBytesReceived(); |
| flowFilesSent += childGroupStatus.getFlowFilesSent(); |
| bytesSent += childGroupStatus.getBytesSent(); |
| |
| flowFilesTransferred += childGroupStatus.getFlowFilesTransferred(); |
| bytesTransferred += childGroupStatus.getBytesTransferred(); |
| } |
| |
| // set status for remote child groups |
| final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatusCollection = new ArrayList<>(); |
| status.setRemoteProcessGroupStatus(remoteProcessGroupStatusCollection); |
| for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { |
| final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport, isAuthorized); |
| if (remoteStatus != null) { |
| if (populateChildStatuses) { |
| remoteProcessGroupStatusCollection.add(remoteStatus); |
| } |
| |
| flowFilesReceived += remoteStatus.getReceivedCount(); |
| bytesReceived += remoteStatus.getReceivedContentSize(); |
| flowFilesSent += remoteStatus.getSentCount(); |
| bytesSent += remoteStatus.getSentContentSize(); |
| } |
| } |
| |
| // connection status |
| final Collection<ConnectionStatus> connectionStatusCollection = new ArrayList<>(); |
| status.setConnectionStatus(connectionStatusCollection); |
| |
| long now = System.currentTimeMillis(); |
| |
| // get the connection and remote port status |
| for (final Connection conn : group.getConnections()) { |
| final boolean isConnectionAuthorized = isAuthorized.test(conn); |
| final boolean isSourceAuthorized = isAuthorized.test(conn.getSource()); |
| final boolean isDestinationAuthorized = isAuthorized.test(conn.getDestination()); |
| |
| final ConnectionStatus connStatus = new ConnectionStatus(); |
| connStatus.setId(conn.getIdentifier()); |
| connStatus.setGroupId(conn.getProcessGroup().getIdentifier()); |
| connStatus.setSourceId(conn.getSource().getIdentifier()); |
| connStatus.setSourceName(isSourceAuthorized ? conn.getSource().getName() : conn.getSource().getIdentifier()); |
| connStatus.setDestinationId(conn.getDestination().getIdentifier()); |
| connStatus.setDestinationName(isDestinationAuthorized ? conn.getDestination().getName() : conn.getDestination().getIdentifier()); |
| connStatus.setBackPressureDataSizeThreshold(conn.getFlowFileQueue().getBackPressureDataSizeThreshold()); |
| connStatus.setBackPressureObjectThreshold(conn.getFlowFileQueue().getBackPressureObjectThreshold()); |
| connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now)); |
| long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate(); |
| connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate); |
| |
| final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier()); |
| if (connectionStatusReport != null) { |
| connStatus.setInputBytes(connectionStatusReport.getContentSizeIn()); |
| connStatus.setInputCount(connectionStatusReport.getFlowFilesIn()); |
| connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut()); |
| connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut()); |
| |
| flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut(); |
| bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut(); |
| } |
| |
| if (statusAnalyticsEngine != null) { |
| StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier()); |
| if (statusAnalytics != null) { |
| Map<String,Long> predictionValues = statusAnalytics.getPredictions(); |
| ConnectionStatusPredictions predictions = new ConnectionStatusPredictions(); |
| connStatus.setPredictions(predictions); |
| predictions.setPredictedTimeToBytesBackpressureMillis(predictionValues.get("timeToBytesBackpressureMillis")); |
| predictions.setPredictedTimeToCountBackpressureMillis(predictionValues.get("timeToCountBackpressureMillis")); |
| predictions.setNextPredictedQueuedBytes(predictionValues.get("nextIntervalBytes")); |
| predictions.setNextPredictedQueuedCount(predictionValues.get("nextIntervalCount").intValue()); |
| predictions.setPredictedPercentCount(predictionValues.get("nextIntervalPercentageUseCount").intValue()); |
| predictions.setPredictedPercentBytes(predictionValues.get("nextIntervalPercentageUseBytes").intValue()); |
| predictions.setPredictionIntervalMillis(predictionValues.get("intervalTimeMillis")); |
| } |
| } else { |
| connStatus.setPredictions(null); |
| } |
| |
| if (isConnectionAuthorized) { |
| if (StringUtils.isNotBlank(conn.getName())) { |
| connStatus.setName(conn.getName()); |
| } else if (conn.getRelationships() != null && !conn.getRelationships().isEmpty()) { |
| final Collection<String> relationships = new ArrayList<>(conn.getRelationships().size()); |
| for (final Relationship relationship : conn.getRelationships()) { |
| relationships.add(relationship.getName()); |
| } |
| connStatus.setName(StringUtils.join(relationships, ", ")); |
| } |
| } else { |
| connStatus.setName(conn.getIdentifier()); |
| } |
| |
| final QueueSize queueSize = conn.getFlowFileQueue().size(); |
| final int connectionQueuedCount = queueSize.getObjectCount(); |
| final long connectionQueuedBytes = queueSize.getByteCount(); |
| if (connectionQueuedCount > 0) { |
| connStatus.setQueuedBytes(connectionQueuedBytes); |
| connStatus.setQueuedCount(connectionQueuedCount); |
| } |
| |
| if (populateChildStatuses) { |
| connectionStatusCollection.add(connStatus); |
| } |
| |
| queuedCount += connectionQueuedCount; |
| queuedContentSize += connectionQueuedBytes; |
| |
| final Connectable source = conn.getSource(); |
| if (ConnectableType.REMOTE_OUTPUT_PORT.equals(source.getConnectableType())) { |
| final RemoteGroupPort remoteOutputPort = (RemoteGroupPort) source; |
| activeGroupThreads += processScheduler.getActiveThreadCount(remoteOutputPort); |
| } |
| |
| final Connectable destination = conn.getDestination(); |
| if (ConnectableType.REMOTE_INPUT_PORT.equals(destination.getConnectableType())) { |
| final RemoteGroupPort remoteInputPort = (RemoteGroupPort) destination; |
| activeGroupThreads += processScheduler.getActiveThreadCount(remoteInputPort); |
| } |
| } |
| |
| // status for input ports |
| final Collection<PortStatus> inputPortStatusCollection = new ArrayList<>(); |
| status.setInputPortStatus(inputPortStatusCollection); |
| |
| final Set<Port> inputPorts = group.getInputPorts(); |
| for (final Port port : inputPorts) { |
| final boolean isInputPortAuthorized = isAuthorized.test(port); |
| |
| final PortStatus portStatus = new PortStatus(); |
| portStatus.setId(port.getIdentifier()); |
| portStatus.setGroupId(port.getProcessGroup().getIdentifier()); |
| portStatus.setName(isInputPortAuthorized ? port.getName() : port.getIdentifier()); |
| portStatus.setActiveThreadCount(processScheduler.getActiveThreadCount(port)); |
| |
| // determine the run status |
| if (ScheduledState.RUNNING.equals(port.getScheduledState())) { |
| portStatus.setRunStatus(RunStatus.Running); |
| } else if (ScheduledState.DISABLED.equals(port.getScheduledState())) { |
| portStatus.setRunStatus(RunStatus.Disabled); |
| } else if (!port.isValid()) { |
| portStatus.setRunStatus(RunStatus.Invalid); |
| } else { |
| portStatus.setRunStatus(RunStatus.Stopped); |
| } |
| |
| // special handling for public ports |
| if (port instanceof PublicPort) { |
| portStatus.setTransmitting(((PublicPort) port).isTransmitting()); |
| } |
| |
| final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier()); |
| if (entry == null) { |
| portStatus.setInputBytes(0L); |
| portStatus.setInputCount(0); |
| portStatus.setOutputBytes(0L); |
| portStatus.setOutputCount(0); |
| } else { |
| final int processedCount = entry.getFlowFilesOut(); |
| final long numProcessedBytes = entry.getContentSizeOut(); |
| portStatus.setOutputBytes(numProcessedBytes); |
| portStatus.setOutputCount(processedCount); |
| |
| final int inputCount = entry.getFlowFilesIn(); |
| final long inputBytes = entry.getContentSizeIn(); |
| portStatus.setInputBytes(inputBytes); |
| portStatus.setInputCount(inputCount); |
| |
| flowFilesIn += port instanceof PublicPort ? entry.getFlowFilesReceived() : inputCount; |
| bytesIn += port instanceof PublicPort ? entry.getBytesReceived() : inputBytes; |
| |
| bytesWritten += entry.getBytesWritten(); |
| |
| flowFilesReceived += entry.getFlowFilesReceived(); |
| bytesReceived += entry.getBytesReceived(); |
| } |
| |
| if (populateChildStatuses) { |
| inputPortStatusCollection.add(portStatus); |
| } |
| |
| activeGroupThreads += portStatus.getActiveThreadCount(); |
| } |
| |
| // status for output ports |
| final Collection<PortStatus> outputPortStatusCollection = new ArrayList<>(); |
| status.setOutputPortStatus(outputPortStatusCollection); |
| |
| final Set<Port> outputPorts = group.getOutputPorts(); |
| for (final Port port : outputPorts) { |
| final boolean isOutputPortAuthorized = isAuthorized.test(port); |
| |
| final PortStatus portStatus = new PortStatus(); |
| portStatus.setId(port.getIdentifier()); |
| portStatus.setGroupId(port.getProcessGroup().getIdentifier()); |
| portStatus.setName(isOutputPortAuthorized ? port.getName() : port.getIdentifier()); |
| portStatus.setActiveThreadCount(processScheduler.getActiveThreadCount(port)); |
| |
| // determine the run status |
| if (ScheduledState.RUNNING.equals(port.getScheduledState())) { |
| portStatus.setRunStatus(RunStatus.Running); |
| } else if (ScheduledState.DISABLED.equals(port.getScheduledState())) { |
| portStatus.setRunStatus(RunStatus.Disabled); |
| } else if (!port.isValid()) { |
| portStatus.setRunStatus(RunStatus.Invalid); |
| } else { |
| portStatus.setRunStatus(RunStatus.Stopped); |
| } |
| |
| // special handling for public ports |
| if (port instanceof PublicPort) { |
| portStatus.setTransmitting(((PublicPort) port).isTransmitting()); |
| } |
| |
| final FlowFileEvent entry = statusReport.getReportEntries().get(port.getIdentifier()); |
| if (entry == null) { |
| portStatus.setInputBytes(0L); |
| portStatus.setInputCount(0); |
| portStatus.setOutputBytes(0L); |
| portStatus.setOutputCount(0); |
| } else { |
| final int processedCount = entry.getFlowFilesOut(); |
| final long numProcessedBytes = entry.getContentSizeOut(); |
| portStatus.setOutputBytes(numProcessedBytes); |
| portStatus.setOutputCount(processedCount); |
| |
| final int inputCount = entry.getFlowFilesIn(); |
| final long inputBytes = entry.getContentSizeIn(); |
| portStatus.setInputBytes(inputBytes); |
| portStatus.setInputCount(inputCount); |
| |
| bytesRead += entry.getBytesRead(); |
| |
| flowFilesOut += port instanceof PublicPort ? entry.getFlowFilesSent() : entry.getFlowFilesOut(); |
| bytesOut += port instanceof PublicPort ? entry.getBytesSent() : entry.getContentSizeOut(); |
| |
| flowFilesSent = entry.getFlowFilesSent(); |
| bytesSent += entry.getBytesSent(); |
| } |
| |
| if (populateChildStatuses) { |
| outputPortStatusCollection.add(portStatus); |
| } |
| |
| activeGroupThreads += portStatus.getActiveThreadCount(); |
| } |
| |
| for (final Funnel funnel : group.getFunnels()) { |
| activeGroupThreads += processScheduler.getActiveThreadCount(funnel); |
| } |
| |
| status.setActiveThreadCount(activeGroupThreads); |
| status.setTerminatedThreadCount(terminatedGroupThreads); |
| status.setBytesRead(bytesRead); |
| status.setBytesWritten(bytesWritten); |
| status.setQueuedCount(queuedCount); |
| status.setQueuedContentSize(queuedContentSize); |
| status.setInputContentSize(bytesIn); |
| status.setInputCount(flowFilesIn); |
| status.setOutputContentSize(bytesOut); |
| status.setOutputCount(flowFilesOut); |
| status.setFlowFilesReceived(flowFilesReceived); |
| status.setBytesReceived(bytesReceived); |
| status.setFlowFilesSent(flowFilesSent); |
| status.setBytesSent(bytesSent); |
| status.setFlowFilesTransferred(flowFilesTransferred); |
| status.setBytesTransferred(bytesTransferred); |
| |
| final VersionControlInformation vci = group.getVersionControlInformation(); |
| if (vci != null) { |
| try { |
| final VersionedFlowStatus flowStatus = vci.getStatus(); |
| if (flowStatus != null && flowStatus.getState() != null) { |
| status.setVersionedFlowState(flowStatus.getState()); |
| } |
| } catch (final Exception e) { |
| logger.warn("Failed to determine Version Control State for {}. Will consider state to be SYNC_FAILURE", group, e); |
| status.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE); |
| } |
| } |
| |
| return status; |
| } |
| |
| private RemoteProcessGroupStatus createRemoteGroupStatus(final RemoteProcessGroup remoteGroup, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized) { |
| final boolean isRemoteProcessGroupAuthorized = isAuthorized.test(remoteGroup); |
| |
| int receivedCount = 0; |
| long receivedContentSize = 0L; |
| int sentCount = 0; |
| long sentContentSize = 0L; |
| int activeThreadCount = 0; |
| int activePortCount = 0; |
| int inactivePortCount = 0; |
| |
| final RemoteProcessGroupStatus status = new RemoteProcessGroupStatus(); |
| status.setGroupId(remoteGroup.getProcessGroup().getIdentifier()); |
| status.setName(isRemoteProcessGroupAuthorized ? remoteGroup.getName() : remoteGroup.getIdentifier()); |
| status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri() : null); |
| |
| long lineageMillis = 0L; |
| int flowFilesRemoved = 0; |
| int flowFilesTransferred = 0; |
| for (final Port port : remoteGroup.getInputPorts()) { |
| // determine if this input port is connected |
| final boolean isConnected = port.hasIncomingConnection(); |
| |
| // we only want to consider remote ports that we are connected to |
| if (isConnected) { |
| if (port.isRunning()) { |
| activePortCount++; |
| } else { |
| inactivePortCount++; |
| } |
| |
| activeThreadCount += processScheduler.getActiveThreadCount(port); |
| |
| final FlowFileEvent portEvent = statusReport.getReportEntry(port.getIdentifier()); |
| if (portEvent != null) { |
| lineageMillis += portEvent.getAggregateLineageMillis(); |
| flowFilesRemoved += portEvent.getFlowFilesRemoved(); |
| flowFilesTransferred += portEvent.getFlowFilesOut(); |
| sentCount += portEvent.getFlowFilesSent(); |
| sentContentSize += portEvent.getBytesSent(); |
| } |
| } |
| } |
| |
| for (final Port port : remoteGroup.getOutputPorts()) { |
| // determine if this output port is connected |
| final boolean isConnected = !port.getConnections().isEmpty(); |
| |
| // we only want to consider remote ports that we are connected from |
| if (isConnected) { |
| if (port.isRunning()) { |
| activePortCount++; |
| } else { |
| inactivePortCount++; |
| } |
| |
| activeThreadCount += processScheduler.getActiveThreadCount(port); |
| |
| final FlowFileEvent portEvent = statusReport.getReportEntry(port.getIdentifier()); |
| if (portEvent != null) { |
| receivedCount += portEvent.getFlowFilesReceived(); |
| receivedContentSize += portEvent.getBytesReceived(); |
| } |
| } |
| } |
| |
| status.setId(remoteGroup.getIdentifier()); |
| status.setTransmissionStatus(remoteGroup.isTransmitting() ? TransmissionStatus.Transmitting : TransmissionStatus.NotTransmitting); |
| status.setActiveThreadCount(activeThreadCount); |
| status.setReceivedContentSize(receivedContentSize); |
| status.setReceivedCount(receivedCount); |
| status.setSentContentSize(sentContentSize); |
| status.setSentCount(sentCount); |
| status.setActiveRemotePortCount(activePortCount); |
| status.setInactiveRemotePortCount(inactivePortCount); |
| |
| final int flowFilesOutOrRemoved = flowFilesTransferred + flowFilesRemoved; |
| status.setAverageLineageDuration(flowFilesOutOrRemoved == 0 ? 0 : lineageMillis / flowFilesOutOrRemoved, TimeUnit.MILLISECONDS); |
| |
| return status; |
| } |
| |
| private ProcessorStatus getProcessorStatus(final RepositoryStatusReport report, final ProcessorNode procNode, final Predicate<Authorizable> isAuthorized) { |
| final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier()); |
| return getProcessorStatus(entry, procNode, isAuthorized); |
| } |
| |
| protected ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent, final ProcessorNode procNode, final Predicate<Authorizable> isAuthorized) { |
| final boolean isProcessorAuthorized = isAuthorized.test(procNode); |
| |
| final ProcessorStatus status = new ProcessorStatus(); |
| status.setId(procNode.getIdentifier()); |
| status.setGroupId(procNode.getProcessGroup().getIdentifier()); |
| status.setName(isProcessorAuthorized ? procNode.getName() : procNode.getIdentifier()); |
| status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor"); |
| |
| if (flowFileEvent != null && flowFileEvent != EmptyFlowFileEvent.INSTANCE) { |
| final int processedCount = flowFileEvent.getFlowFilesOut(); |
| final long numProcessedBytes = flowFileEvent.getContentSizeOut(); |
| status.setOutputBytes(numProcessedBytes); |
| status.setOutputCount(processedCount); |
| |
| final int inputCount = flowFileEvent.getFlowFilesIn(); |
| final long inputBytes = flowFileEvent.getContentSizeIn(); |
| status.setInputBytes(inputBytes); |
| status.setInputCount(inputCount); |
| |
| final long readBytes = flowFileEvent.getBytesRead(); |
| status.setBytesRead(readBytes); |
| |
| final long writtenBytes = flowFileEvent.getBytesWritten(); |
| status.setBytesWritten(writtenBytes); |
| |
| status.setProcessingNanos(flowFileEvent.getProcessingNanoseconds()); |
| status.setInvocations(flowFileEvent.getInvocations()); |
| |
| status.setAverageLineageDuration(flowFileEvent.getAverageLineageMillis()); |
| |
| status.setFlowFilesReceived(flowFileEvent.getFlowFilesReceived()); |
| status.setBytesReceived(flowFileEvent.getBytesReceived()); |
| status.setFlowFilesSent(flowFileEvent.getFlowFilesSent()); |
| status.setBytesSent(flowFileEvent.getBytesSent()); |
| status.setFlowFilesRemoved(flowFileEvent.getFlowFilesRemoved()); |
| |
| if (isProcessorAuthorized) { |
| status.setCounters(flowFileEvent.getCounters()); |
| } |
| } |
| |
| // Determine the run status and get any validation error... only validating while STOPPED |
| // is a trade-off we are willing to make, even though processor validity could change due to |
| // environmental conditions (property configured with a file path and the file being externally |
| // removed). This saves on validation costs that would be unnecessary most of the time. |
| if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { |
| status.setRunStatus(RunStatus.Disabled); |
| } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) { |
| status.setRunStatus(RunStatus.Running); |
| } else if (procNode.getValidationStatus() == ValidationStatus.VALIDATING) { |
| status.setRunStatus(RunStatus.Validating); |
| } else if (procNode.getValidationStatus() == ValidationStatus.INVALID && procNode.getActiveThreadCount() == 0) { |
| status.setRunStatus(RunStatus.Invalid); |
| } else { |
| status.setRunStatus(RunStatus.Stopped); |
| } |
| |
| status.setExecutionNode(procNode.getExecutionNode()); |
| status.setTerminatedThreadCount(procNode.getTerminatedThreadCount()); |
| status.setActiveThreadCount(procNode.getActiveThreadCount()); |
| |
| return status; |
| } |
| |
| /** |
| * Returns the status of all components in the controller. This request is |
| * not in the context of a user so the results will be unfiltered. |
| * |
| * @return the component status |
| */ |
| @Override |
| public ProcessGroupStatus getControllerStatus() { |
| return getGroupStatus(flowManager.getRootGroupId()); |
| } |
| |
| @Override |
| public List<ProvenanceEventRecord> getProvenanceEvents(final long firstEventId, final int maxRecords) throws IOException { |
| return new ArrayList<>(getProvenanceRepository().getEvents(firstEventId, maxRecords)); |
| } |
| |
| /** |
| * Returns the total number of bytes read by this instance (at the root process group level, i.e. all events) since the instance started |
| * |
| * @return the total number of bytes read by this instance |
| */ |
| @Override |
| public long getTotalBytesRead() { |
| return flowFileEventRepository.reportAggregateEvent().getBytesRead(); |
| } |
| |
| /** |
| * Returns the total number of bytes written by this instance (at the root process group level, i.e. all events) since the instance started |
| * |
| * @return the total number of bytes written by this instance |
| */ |
| @Override |
| public long getTotalBytesWritten() { |
| return flowFileEventRepository.reportAggregateEvent().getBytesWritten(); |
| } |
| |
| /** |
| * Returns the total number of bytes sent by this instance (at the root process group level) since the instance started |
| * |
| * @return the total number of bytes sent by this instance |
| */ |
| @Override |
| public long getTotalBytesSent() { |
| return flowFileEventRepository.reportAggregateEvent().getBytesSent(); |
| } |
| |
| /** |
| * Returns the total number of bytes received by this instance (at the root process group level) since the instance started |
| * |
| * @return the total number of bytes received by this instance |
| */ |
| @Override |
| public long getTotalBytesReceived() { |
| return flowFileEventRepository.reportAggregateEvent().getBytesReceived(); |
| } |
| } |