blob: 5ade471c290b63ff002d2b0192049583d6c635c8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.controller.tasks;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.util.Connectables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
* Continually runs a <code>{@link Connectable}</code> component as long as the component has work to do.
* {@link #invoke()} ()} will return <code>{@link InvocationResult}</code> telling if the component should be yielded.
public class ConnectableTask {
private static final Logger logger = LoggerFactory.getLogger(ConnectableTask.class);
private final SchedulingAgent schedulingAgent;
private final Connectable connectable;
private final RepositoryContext repositoryContext;
private final LifecycleState scheduleState;
private final ProcessContext processContext;
private final FlowController flowController;
private final int numRelationships;
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
final FlowController flowController, final RepositoryContextFactory contextFactory, final LifecycleState scheduleState,
final PropertyEncryptor encryptor) {
this.schedulingAgent = schedulingAgent;
this.connectable = connectable;
this.scheduleState = scheduleState;
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
processContext = new StandardProcessContext(
(ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated, flowController);
} else {
processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
repositoryContext = contextFactory.newProcessContext(connectable, new AtomicLong(0L));
public Connectable getConnectable() {
return connectable;
private boolean isRunOnCluster(final FlowController flowController) {
return !connectable.isIsolated() || !flowController.isConfiguredForClustering() || flowController.isPrimary();
private boolean isYielded() {
// after one yield period, the scheduling agent could call this again when
// yieldExpiration == currentTime, and we don't want that to still be considered 'yielded'
// so this uses ">" instead of ">="
return connectable.getYieldExpiration() > System.currentTimeMillis();
* Make sure processor has work to do. This means that it meets one of these criteria:
* <ol>
* <li>It is a Funnel and has incoming FlowFiles from other components, and and at least one outgoing connection.</li>
* <li>It is a 'source' component, meaning:<ul>
* <li>It is annotated with @TriggerWhenEmpty</li>
* <li>It has no incoming connections</li>
* <li>All incoming connections are self-loops</li>
* </ul></li>
* <li>It has data in incoming connections to process</li>
* </ol>
* @return true if there is work to do, otherwise false
private boolean isWorkToDo() {
boolean hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
if (connectable.getConnectableType() == ConnectableType.FUNNEL) {
// Handle Funnel as a special case because it will never be a 'source' component,
// and also its outgoing connections can not be terminated.
// Incoming FlowFiles from other components, and at least one outgoing connection are required.
return connectable.hasIncomingConnection()
&& hasNonLoopConnection
&& !connectable.getConnections().isEmpty()
&& Connectables.flowFilesQueued(connectable);
final boolean isSourceComponent = connectable.isTriggerWhenEmpty()
// No input connections
|| !connectable.hasIncomingConnection()
// Every incoming connection loops back to itself, no inputs from other components
|| !hasNonLoopConnection;
// If it is not a 'source' component, it requires a FlowFile to process.
return isSourceComponent || Connectables.flowFilesQueued(connectable);
private boolean isBackPressureEngaged() {
return connectable.getIncomingConnections().stream()
.filter(con -> con.getSource() == connectable)
public InvocationResult invoke() {
if (scheduleState.isTerminated()) {
logger.debug("Will not trigger {} because task is terminated", connectable);
return InvocationResult.DO_NOT_YIELD;
// make sure processor is not yielded
if (isYielded()) {
logger.debug("Will not trigger {} because component is yielded", connectable);
return InvocationResult.DO_NOT_YIELD;
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
if (!isRunOnCluster(flowController)) {
logger.debug("Will not trigger {} because this is not the primary node", connectable);
return InvocationResult.yield("This node is not the primary node");
// Make sure processor has work to do.
if (!isWorkToDo()) {
logger.debug("Yielding {} because it has no work to do", connectable);
return InvocationResult.yield("No work to do");
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
logger.debug("Yielding {} because Backpressure is Applied", connectable);
return InvocationResult.yield("Backpressure Applied");
logger.debug("Triggering {}", connectable);
final long batchNanos = connectable.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
final StandardProcessSession rawSession;
final boolean batch;
if (connectable.isSessionBatchingSupported() && batchNanos > 0L) {
rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated);
sessionFactory = new BatchingSessionFactory(rawSession);
batch = true;
} else {
rawSession = null;
sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated);
batch = false;
final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);
final long startNanos = System.nanoTime();
final long finishIfBackpressureEngaged = startNanos + (batchNanos / 25L);
final long finishNanos = startNanos + batchNanos;
int invocationCount = 0;
final String originalThreadName = Thread.currentThread().getName();
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) {
boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING || connectable.getScheduledState() == ScheduledState.RUN_ONCE;
while (shouldRun) {
connectable.onTrigger(processContext, activeSessionFactory);
if (!batch) {
return InvocationResult.DO_NOT_YIELD;
final long nanoTime = System.nanoTime();
if (nanoTime > finishNanos) {
return InvocationResult.DO_NOT_YIELD;
if (nanoTime > finishIfBackpressureEngaged && isBackPressureEngaged()) {
return InvocationResult.DO_NOT_YIELD;
if (connectable.getScheduledState() != ScheduledState.RUNNING) {
if (!isWorkToDo()) {
if (isYielded()) {
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
shouldRun = repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
} catch (final TerminatedTaskException tte) {
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());"Failed to process session due to task being terminated", new Object[] {tte});
} catch (final ProcessException pe) {
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
procLog.error("Failed to process session due to {}", new Object[] {pe});
} catch (final Throwable t) {
// Use ComponentLog to log the event so that a bulletin will be created for this processor
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
procLog.error("Failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {t, schedulingAgent.getAdministrativeYieldDuration()}, t);
logger.warn("Administratively Yielding {} due to uncaught Exception: {}", connectable.getRunnableComponent(), t.toString(), t);
connectable.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
} finally {
try {
if (batch) {
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
try {
rawSession.commitAsync(null, t -> {
procLog.error("Failed to commit session {} due to {}; rolling back", new Object[]{rawSession, t.toString()}, t);
} catch (final TerminatedTaskException tte) {
procLog.debug("Cannot commit Batch Process Session because the Task was forcefully terminated", tte);
final long processingNanos = System.nanoTime() - startNanos;
try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
repositoryContext.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString());
logger.error("", e);
} finally {
return InvocationResult.DO_NOT_YIELD;