blob: ce9ae96b2ba1cd7453a40121cd230f531d2fbaf6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
/**
* ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists
* within a controlled flow. This node keeps track of the processor, its
* scheduling information and its relationships to other processors and whatever
* scheduled futures exist for it. Must be thread safe.
*
*/
public class StandardProcessorNode extends ProcessorNode implements Connectable {
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessorNode.class);
public static final String BULLETIN_OBSERVER_ID = "bulletin-observer";
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final String DEFAULT_YIELD_PERIOD = "1 sec";
public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
private final AtomicReference<ProcessGroup> processGroup;
private final Processor processor;
private final AtomicReference<String> identifier;
private final Map<Connection, Connectable> destinations;
private final Map<Relationship, Set<Connection>> connections;
private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate;
private final AtomicReference<List<Connection>> incomingConnectionsRef;
private final AtomicBoolean isolated;
private final AtomicBoolean lossTolerant;
private final AtomicReference<String> comments;
private final AtomicReference<Position> position;
private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it
private final AtomicReference<String> yieldPeriod;
private final AtomicReference<String> penalizationPeriod;
private final AtomicReference<Map<String, String>> style;
private final AtomicInteger concurrentTaskCount;
private final AtomicLong yieldExpiration;
private final AtomicLong schedulingNanos;
private final boolean triggerWhenEmpty;
private final boolean sideEffectFree;
private final boolean triggeredSerially;
private final boolean triggerWhenAnyDestinationAvailable;
private final boolean eventDrivenSupported;
private final boolean batchSupported;
private final Requirement inputRequirement;
private final ProcessScheduler processScheduler;
private long runNanos = 0L;
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
// ??????? NOT any more
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider) {
this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider,
processor.getClass().getSimpleName(), processor.getClass().getCanonicalName());
}
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider,
final String componentType, final String componentCanonicalClass) {
super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
this.processor = processor;
identifier = new AtomicReference<>(uuid);
destinations = new HashMap<>();
connections = new HashMap<>();
incomingConnectionsRef = new AtomicReference<>(new ArrayList<>());
lossTolerant = new AtomicBoolean(false);
final Set<Relationship> emptySetOfRelationships = new HashSet<>();
undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
comments = new AtomicReference<>("");
schedulingPeriod = new AtomicReference<>("0 sec");
schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD);
yieldExpiration = new AtomicLong(0L);
concurrentTaskCount = new AtomicInteger(1);
position = new AtomicReference<>(new Position(0D, 0D));
style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>()));
this.processGroup = new AtomicReference<>();
processScheduler = scheduler;
isolated = new AtomicBoolean(false);
penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
final Class<?> procClass = processor.getClass();
triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class);
sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class);
batchSupported = procClass.isAnnotationPresent(SupportsBatching.class);
triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class);
triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class);
if (inputRequirementPresent) {
inputRequirement = procClass.getAnnotation(InputRequirement.class).value();
} else {
inputRequirement = Requirement.INPUT_ALLOWED;
}
schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
}
/**
* @return comments about this specific processor instance
*/
@Override
public String getComments() {
return comments.get();
}
@Override
public Authorizable getParentAuthorizable() {
return getProcessGroup();
}
@Override
public Resource getResource() {
return ResourceFactory.getComponentResource(ResourceType.Processor, getIdentifier(), getName());
}
/**
* Provides and opportunity to retain information about this particular
* processor instance
*
* @param comments
* new comments
*/
@Override
public void setComments(final String comments) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.comments.set(comments);
}
@Override
public Position getPosition() {
return position.get();
}
@Override
public void setPosition(final Position position) {
this.position.set(position);
}
@Override
public Map<String, String> getStyle() {
return style.get();
}
@Override
public void setStyle(final Map<String, String> style) {
if (style != null) {
this.style.set(Collections.unmodifiableMap(new HashMap<>(style)));
}
}
@Override
public String getIdentifier() {
return identifier.get();
}
/**
* @return if true flow file content generated by this processor is
* considered loss tolerant
*/
@Override
public boolean isLossTolerant() {
return lossTolerant.get();
}
@Override
public boolean isIsolated() {
return isolated.get();
}
/**
* @return true if the processor has the {@link TriggerWhenEmpty}
* annotation, false otherwise.
*/
@Override
public boolean isTriggerWhenEmpty() {
return triggerWhenEmpty;
}
/**
* @return true if the processor has the {@link SideEffectFree} annotation,
* false otherwise.
*/
@Override
public boolean isSideEffectFree() {
return sideEffectFree;
}
@Override
public boolean isHighThroughputSupported() {
return batchSupported;
}
/**
* @return true if the processor has the
* {@link TriggerWhenAnyDestinationAvailable} annotation, false
* otherwise.
*/
@Override
public boolean isTriggerWhenAnyDestinationAvailable() {
return triggerWhenAnyDestinationAvailable;
}
/**
* Indicates whether flow file content made by this processor must be
* persisted
*
* @param lossTolerant
* tolerant
*/
@Override
public void setLossTolerant(final boolean lossTolerant) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.lossTolerant.set(lossTolerant);
}
/**
* Indicates whether the processor runs on only the primary node.
*
* @param isolated
* isolated
*/
public void setIsolated(final boolean isolated) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.isolated.set(isolated);
}
@Override
public boolean isAutoTerminated(final Relationship relationship) {
if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) {
return true;
}
final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get();
return terminatable == null ? false : terminatable.contains(relationship);
}
@Override
public void setAutoTerminatedRelationships(final Set<Relationship> terminate) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
for (final Relationship rel : terminate) {
if (!getConnections(rel).isEmpty()) {
throw new IllegalStateException("Cannot mark relationship '" + rel.getName()
+ "' as auto-terminated because Connection already exists with this relationship");
}
}
undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
}
/**
* @return an unmodifiable Set that contains all of the
* ProcessorRelationship objects that are configured to be
* auto-terminated
*/
@Override
public Set<Relationship> getAutoTerminatedRelationships() {
Set<Relationship> relationships = undefinedRelationshipsToTerminate.get();
if (relationships == null) {
relationships = new HashSet<>();
}
return Collections.unmodifiableSet(relationships);
}
/**
* @return the value of the processor's {@link CapabilityDescription}
* annotation, if one exists, else <code>null</code>.
*/
public String getProcessorDescription() {
final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class);
String description = null;
if (capDesc != null) {
description = capDesc.value();
}
return description;
}
@Override
public void setName(final String name) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
super.setName(name);
}
/**
* @param timeUnit
* determines the unit of time to represent the scheduling
* period. If null will be reported in units of
* {@link #DEFAULT_SCHEDULING_TIME_UNIT}
* @return the schedule period that should elapse before subsequent cycles
* of this processor's tasks
*/
@Override
public long getSchedulingPeriod(final TimeUnit timeUnit) {
return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
}
@Override
public boolean isEventDrivenSupported() {
return this.eventDrivenSupported;
}
/**
* Updates the Scheduling Strategy used for this Processor
*
* @param schedulingStrategy
* strategy
*
* @throws IllegalArgumentException
* if the SchedulingStrategy is not not applicable for this
* Processor
*/
@Override
public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) {
// not valid. Just ignore it. We don't throw an Exception because if
// a developer changes a Processor so that
// it no longer supports EventDriven mode, we don't want the app to
// fail to startup if it was already in Event-Driven
// Mode. Instead, we will simply leave it in Timer-Driven mode
return;
}
this.schedulingStrategy = schedulingStrategy;
setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY);
}
/**
* @return the currently configured scheduling strategy
*/
@Override
public SchedulingStrategy getSchedulingStrategy() {
return this.schedulingStrategy;
}
@Override
public String getSchedulingPeriod() {
return schedulingPeriod.get();
}
@Override
public void setScheduldingPeriod(final String schedulingPeriod) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
switch (schedulingStrategy) {
case CRON_DRIVEN: {
try {
new CronExpression(schedulingPeriod);
} catch (final Exception e) {
throw new IllegalArgumentException(
"Scheduling Period is not a valid cron expression: " + schedulingPeriod);
}
}
break;
case PRIMARY_NODE_ONLY:
case TIMER_DRIVEN: {
final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod),
TimeUnit.NANOSECONDS);
if (schedulingNanos < 0) {
throw new IllegalArgumentException("Scheduling Period must be positive");
}
this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
}
break;
case EVENT_DRIVEN:
default:
return;
}
this.schedulingPeriod.set(schedulingPeriod);
}
@Override
public long getRunDuration(final TimeUnit timeUnit) {
return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
}
@Override
public void setRunDuration(final long duration, final TimeUnit timeUnit) {
if (duration < 0) {
throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to "
+ timeUnit.toSeconds(duration) + " seconds");
}
this.runNanos = timeUnit.toNanos(duration);
}
@Override
public long getYieldPeriod(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
}
@Override
public String getYieldPeriod() {
return yieldPeriod.get();
}
@Override
public void setYieldPeriod(final String yieldPeriod) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
if (yieldMillis < 0) {
throw new IllegalArgumentException("Yield duration must be positive");
}
this.yieldPeriod.set(yieldPeriod);
}
/**
* Causes the processor not to be scheduled for some period of time. This
* duration can be obtained and set via the
* {@link #getYieldPeriod(TimeUnit)} and
* {@link #setYieldPeriod(long, TimeUnit)} methods.
*/
@Override
public void yield() {
final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
yield(yieldMillis, TimeUnit.MILLISECONDS);
final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds"
: yieldMillis + " milliseconds";
LoggerFactory.getLogger(processor.getClass()).debug(
"{} has chosen to yield its resources; will not be scheduled to run again for {}", processor,
yieldDuration);
}
@Override
public void yield(final long period, final TimeUnit timeUnit) {
final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit);
yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
processScheduler.yield(this);
}
/**
* @return the number of milliseconds since Epoch at which time this
* processor is to once again be scheduled.
*/
@Override
public long getYieldExpiration() {
return yieldExpiration.get();
}
@Override
public long getPenalizationPeriod(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
}
@Override
public String getPenalizationPeriod() {
return penalizationPeriod.get();
}
@Override
public void setPenalizationPeriod(final String penalizationPeriod) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod),
TimeUnit.MILLISECONDS);
if (penalizationMillis < 0) {
throw new IllegalArgumentException("Penalization duration must be positive");
}
this.penalizationPeriod.set(penalizationPeriod);
}
/**
* Determines the number of concurrent tasks that may be running for this
* processor.
*
* @param taskCount
* a number of concurrent tasks this processor may have running
* @throws IllegalArgumentException
* if the given value is less than 1
*/
@Override
public void setMaxConcurrentTasks(final int taskCount) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) {
throw new IllegalArgumentException();
}
if (!triggeredSerially) {
concurrentTaskCount.set(taskCount);
}
}
@Override
public boolean isTriggeredSerially() {
return triggeredSerially;
}
/**
* @return the number of tasks that may execute concurrently for this
* processor
*/
@Override
public int getMaxConcurrentTasks() {
return concurrentTaskCount.get();
}
@Override
public LogLevel getBulletinLevel() {
return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID);
}
@Override
public void setBulletinLevel(final LogLevel level) {
LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level);
}
@Override
public Set<Connection> getConnections() {
final Set<Connection> allConnections = new HashSet<>();
for (final Set<Connection> connectionSet : connections.values()) {
allConnections.addAll(connectionSet);
}
return allConnections;
}
@Override
public List<Connection> getIncomingConnections() {
return incomingConnectionsRef.get();
}
@Override
public Set<Connection> getConnections(final Relationship relationship) {
final Set<Connection> applicableConnections = connections.get(relationship);
return (applicableConnections == null) ? Collections.<Connection> emptySet()
: Collections.unmodifiableSet(applicableConnections);
}
@Override
public void addConnection(final Connection connection) {
Objects.requireNonNull(connection, "connection cannot be null");
if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) {
throw new IllegalStateException(
"Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination");
}
List<Connection> updatedIncoming = null;
if (connection.getDestination().equals(this)) {
// don't add the connection twice. This may occur if we have a
// self-loop because we will be told
// to add the connection once because we are the source and again
// because we are the destination.
final List<Connection> incomingConnections = incomingConnectionsRef.get();
updatedIncoming = new ArrayList<>(incomingConnections);
if (!updatedIncoming.contains(connection)) {
updatedIncoming.add(connection);
}
}
if (connection.getSource().equals(this)) {
// don't add the connection twice. This may occur if we have a
// self-loop because we will be told
// to add the connection once because we are the source and again
// because we are the destination.
if (!destinations.containsKey(connection)) {
for (final Relationship relationship : connection.getRelationships()) {
final Relationship rel = getRelationship(relationship.getName());
Set<Connection> set = connections.get(rel);
if (set == null) {
set = new HashSet<>();
connections.put(rel, set);
}
set.add(connection);
destinations.put(connection, connection.getDestination());
}
final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get();
if (autoTerminated != null) {
autoTerminated.removeAll(connection.getRelationships());
this.undefinedRelationshipsToTerminate.set(autoTerminated);
}
}
}
if (updatedIncoming != null) {
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
}
}
@Override
public boolean hasIncomingConnection() {
return !incomingConnectionsRef.get().isEmpty();
}
@Override
public void updateConnection(final Connection connection) throws IllegalStateException {
if (requireNonNull(connection).getSource().equals(this)) {
// update any relationships
//
// first check if any relations were removed.
final List<Relationship> existingRelationships = new ArrayList<>();
for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) {
if (entry.getValue().contains(connection)) {
existingRelationships.add(entry.getKey());
}
}
for (final Relationship rel : connection.getRelationships()) {
if (!existingRelationships.contains(rel)) {
// relationship was removed. Check if this is legal.
final Set<Connection> connectionsForRelationship = getConnections(rel);
if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning()
&& !isAutoTerminated(rel) && getRelationships().contains(rel)) {
// if we are running and we do not terminate undefined
// relationships and this is the only
// connection that defines the given relationship, and
// that relationship is required,
// then it is not legal to remove this relationship from
// this connection.
throw new IllegalStateException("Cannot remove relationship " + rel.getName()
+ " from Connection because doing so would invalidate Processor " + this
+ ", which is currently running");
}
}
}
// remove the connection from any list that currently contains
for (final Set<Connection> list : connections.values()) {
list.remove(connection);
}
// add the connection in for all relationships listed.
for (final Relationship rel : connection.getRelationships()) {
Set<Connection> set = connections.get(rel);
if (set == null) {
set = new HashSet<>();
connections.put(rel, set);
}
set.add(connection);
}
// update to the new destination
destinations.put(connection, connection.getDestination());
final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get();
if (autoTerminated != null) {
autoTerminated.removeAll(connection.getRelationships());
this.undefinedRelationshipsToTerminate.set(autoTerminated);
}
}
if (connection.getDestination().equals(this)) {
// update our incoming connections -- we can just remove & re-add
// the connection to update the list.
final List<Connection> incomingConnections = incomingConnectionsRef.get();
final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections);
updatedIncoming.remove(connection);
updatedIncoming.add(connection);
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
}
}
@Override
public void removeConnection(final Connection connection) {
boolean connectionRemoved = false;
if (requireNonNull(connection).getSource().equals(this)) {
for (final Relationship relationship : connection.getRelationships()) {
final Set<Connection> connectionsForRelationship = getConnections(relationship);
if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) {
throw new IllegalStateException(
"This connection cannot be removed because its source is running and removing it will invalidate this processor");
}
}
for (final Set<Connection> connectionList : this.connections.values()) {
connectionList.remove(connection);
}
connectionRemoved = (destinations.remove(connection) != null);
}
if (connection.getDestination().equals(this)) {
final List<Connection> incomingConnections = incomingConnectionsRef.get();
if (incomingConnections.contains(connection)) {
final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections);
updatedIncoming.remove(connection);
incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
return;
}
}
if (!connectionRemoved) {
throw new IllegalArgumentException(
"Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source");
}
}
/**
* @param relationshipName
* name
* @return the relationship for this nodes processor for the given name or
* creates a new relationship for the given name
*/
@Override
public Relationship getRelationship(final String relationshipName) {
final Relationship specRel = new Relationship.Builder().name(relationshipName).build();
Relationship returnRel = specRel;
final Set<Relationship> relationships;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
relationships = processor.getRelationships();
}
for (final Relationship rel : relationships) {
if (rel.equals(specRel)) {
returnRel = rel;
break;
}
}
return returnRel;
}
@Override
public Processor getProcessor() {
return this.processor;
}
/**
* @return the Set of destination processors for all relationships excluding
* any destinations that are this processor itself (self-loops)
*/
public Set<Connectable> getDestinations() {
final Set<Connectable> nonSelfDestinations = new HashSet<>();
for (final Connectable connectable : destinations.values()) {
if (connectable != this) {
nonSelfDestinations.add(connectable);
}
}
return nonSelfDestinations;
}
public Set<Connectable> getDestinations(final Relationship relationship) {
final Set<Connectable> destinationSet = new HashSet<>();
final Set<Connection> relationshipConnections = connections.get(relationship);
if (relationshipConnections != null) {
for (final Connection connection : relationshipConnections) {
destinationSet.add(destinations.get(connection));
}
}
return destinationSet;
}
public Set<Relationship> getUndefinedRelationships() {
final Set<Relationship> undefined = new HashSet<>();
final Set<Relationship> relationships;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
relationships = processor.getRelationships();
}
if (relationships == null) {
return undefined;
}
for (final Relationship relation : relationships) {
final Set<Connection> connectionSet = this.connections.get(relation);
if (connectionSet == null || connectionSet.isEmpty()) {
undefined.add(relation);
}
}
return undefined;
}
/**
* Determines if the given node is a destination for this node
*
* @param node
* node
* @return true if is a direct destination node; false otherwise
*/
boolean isRelated(final ProcessorNode node) {
return this.destinations.containsValue(node);
}
@Override
public boolean isRunning() {
return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0;
}
@Override
public int getActiveThreadCount() {
return processScheduler.getActiveThreadCount(this);
}
List<Connection> getIncomingNonLoopConnections() {
final List<Connection> connections = getIncomingConnections();
final List<Connection> nonLoopConnections = new ArrayList<>(connections.size());
for (final Connection connection : connections) {
if (!connection.getSource().equals(this)) {
nonLoopConnections.add(connection);
}
}
return nonLoopConnections;
}
@Override
public boolean isValid() {
try {
final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
validationResults = getProcessor().validate(validationContext);
}
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
return false;
}
}
for (final Relationship undef : getUndefinedRelationships()) {
if (!isAutoTerminated(undef)) {
return false;
}
}
switch (getInputRequirement()) {
case INPUT_ALLOWED:
break;
case INPUT_FORBIDDEN: {
if (!getIncomingNonLoopConnections().isEmpty()) {
return false;
}
break;
}
case INPUT_REQUIRED: {
if (getIncomingNonLoopConnections().isEmpty()) {
return false;
}
break;
}
}
} catch (final Throwable t) {
LOG.warn("Failed during validation", t);
return false;
}
return true;
}
@Override
public Collection<ValidationResult> getValidationErrors() {
final List<ValidationResult> results = new ArrayList<>();
try {
final ValidationContext validationContext = this.getValidationContextFactory()
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
validationResults = getProcessor().validate(validationContext);
}
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
results.add(result);
}
}
for (final Relationship relationship : getUndefinedRelationships()) {
if (!isAutoTerminated(relationship)) {
final ValidationResult error = new ValidationResult.Builder()
.explanation("Relationship '" + relationship.getName()
+ "' is not connected to any component and is not auto-terminated")
.subject("Relationship " + relationship.getName()).valid(false).build();
results.add(error);
}
}
switch (getInputRequirement()) {
case INPUT_ALLOWED:
break;
case INPUT_FORBIDDEN: {
final int incomingConnCount = getIncomingNonLoopConnections().size();
if (incomingConnCount != 0) {
results.add(new ValidationResult.Builder().explanation(
"Processor does not allow upstream connections but currently has " + incomingConnCount)
.subject("Upstream Connections").valid(false).build());
}
break;
}
case INPUT_REQUIRED: {
if (getIncomingNonLoopConnections().isEmpty()) {
results.add(new ValidationResult.Builder()
.explanation("Processor requires an upstream connection but currently has none")
.subject("Upstream Connections").valid(false).build());
}
break;
}
}
} catch (final Throwable t) {
results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString())
.valid(false).build());
}
return results;
}
@Override
public Requirement getInputRequirement() {
return inputRequirement;
}
/**
* Establishes node equality (based on the processor's identifier)
*
* @param other
* node
* @return true if equal
*/
@Override
public boolean equals(final Object other) {
if (!(other instanceof ProcessorNode)) {
return false;
}
final ProcessorNode on = (ProcessorNode) other;
return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(7, 67).append(identifier).toHashCode();
}
@Override
public Collection<Relationship> getRelationships() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return getProcessor().getRelationships();
}
}
@Override
public String toString() {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return getProcessor().toString();
}
}
@Override
public ProcessGroup getProcessGroup() {
return processGroup.get();
}
@Override
public void setProcessGroup(final ProcessGroup group) {
this.processGroup.set(group);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
processor.onTrigger(context, sessionFactory);
}
}
@Override
public ConnectableType getConnectableType() {
return ConnectableType.PROCESSOR;
}
@Override
public void setAnnotationData(final String data) {
Assert.state(!isRunning(), "Cannot set AnnotationData while processor is running");
super.setAnnotationData(data);
}
@Override
public Collection<ValidationResult> validate(final ValidationContext validationContext) {
return getValidationErrors();
}
@Override
public void verifyCanDelete() throws IllegalStateException {
verifyCanDelete(false);
}
@Override
public void verifyCanDelete(final boolean ignoreConnections) {
if (isRunning()) {
throw new IllegalStateException(this.getIdentifier() + " is running");
}
if (!ignoreConnections) {
for (final Set<Connection> connectionSet : connections.values()) {
for (final Connection connection : connectionSet) {
connection.verifyCanDelete();
}
}
for (final Connection connection : incomingConnectionsRef.get()) {
if (connection.getSource().equals(this)) {
connection.verifyCanDelete();
} else {
throw new IllegalStateException(this.getIdentifier() + " is the destination of another component");
}
}
}
}
@Override
public void verifyCanStart() {
this.verifyCanStart(null);
}
@Override
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
final ScheduledState currentState = getPhysicalScheduledState();
if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) {
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not stopped. Current state is " + currentState.name());
}
verifyNoActiveThreads();
if (ignoredReferences != null) {
final Set<String> ids = new HashSet<>();
for (final ControllerServiceNode node : ignoredReferences) {
ids.add(node.getIdentifier());
}
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result);
}
}
} else {
if (!isValid()) {
throw new IllegalStateException(this.getIdentifier() + " is not in a valid state");
}
}
}
@Override
public void verifyCanStop() {
if (getScheduledState() != ScheduledState.RUNNING) {
throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run");
}
}
@Override
public void verifyCanUpdate() {
if (isRunning()) {
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
}
}
@Override
public void verifyCanEnable() {
if (getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException(this.getIdentifier() + " is not disabled");
}
verifyNoActiveThreads();
}
@Override
public void verifyCanDisable() {
if (getScheduledState() != ScheduledState.STOPPED) {
throw new IllegalStateException(this.getIdentifier() + " is not stopped");
}
verifyNoActiveThreads();
}
@Override
public void verifyCanClearState() throws IllegalStateException {
verifyCanUpdate();
}
private void verifyNoActiveThreads() throws IllegalStateException {
final int threadCount = processScheduler.getActiveThreadCount(this);
if (threadCount > 0) {
throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active");
}
}
@Override
public void verifyModifiable() throws IllegalStateException {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
}
/**
* Will idempotently start the processor using the following sequence: <i>
* <ul>
* <li>Validate Processor's state (e.g., PropertyDescriptors,
* ControllerServices etc.)</li>
* <li>Transition (atomically) Processor's scheduled state form STOPPED to
* STARTING. If the above state transition succeeds, then execute the start
* task (asynchronously) which will be re-tried until @OnScheduled is
* executed successfully and "schedulingAgentCallback' is invoked, or until
* STOP operation is initiated on this processor. If state transition fails
* it means processor is already being started and WARN message will be
* logged explaining it.</li>
* </ul>
* </i>
* <p>
* Any exception thrown while invoking operations annotated with @OnSchedule
* will be caught and logged after which @OnUnscheduled operation will be
* invoked (quietly) and the start sequence will be repeated (re-try) after
* delay provided by 'administrativeYieldMillis'.
* </p>
* <p>
* Upon successful completion of start sequence (@OnScheduled -&gt;
* 'schedulingAgentCallback') the attempt will be made to transition
* processor's scheduling state to RUNNING at which point processor is
* considered to be fully started and functioning. If upon successful
* invocation of @OnScheduled operation the processor can not be
* transitioned to RUNNING state (e.g., STOP operation was invoked on the
* processor while it's @OnScheduled operation was executing), the
* processor's @OnUnscheduled operation will be invoked and its scheduling
* state will be set to STOPPED at which point the processor is considered
* to be fully stopped.
* </p>
*/
@Override
public <T extends ProcessContext & ControllerServiceLookup> void start(final ScheduledExecutorService taskScheduler,
final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) {
if (!this.isValid()) {
throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
}
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING)) { // will ensure that the Processor represented by this node can only be started once
final Runnable startProcRunnable = new Runnable() {
@Override
public void run() {
try {
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
@Override
public Void call() throws Exception {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
return null;
}
}
});
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
procLog.error( "{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
new Object[] { StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " milliseconds" }, cause);
LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated
taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
scheduledState.set(ScheduledState.STOPPED);
}
}
}
};
taskScheduler.execute(startProcRunnable);
} else {
final String procName = this.processor.getClass().getSimpleName();
LOG.warn("Can not start '" + procName
+ "' since it's already in the process of being started or it is DISABLED - "
+ scheduledState.get());
procLog.warn("Can not start '" + procName
+ "' since it's already in the process of being started or it is DISABLED - "
+ scheduledState.get());
}
}
/**
* Will idempotently stop the processor using the following sequence: <i>
* <ul>
* <li>Transition (atomically) Processor's scheduled state form RUNNING to
* STOPPING. If the above state transition succeeds, then execute the stop
* task (asynchronously) where 'activeThreadMonitorCallback' provided by the
* {@link ProcessScheduler} will be called to check if this processor still
* has active threads. If it does, the task will be re-scheduled with delay
* of 100 milliseconds until there are no more active threads, at which
* point processor's @OnUnscheduled and @OnStopped operation will be invoked
* and its scheduled state is set to STOPPED which completes processor stop
* sequence.</li>
* </ul>
* </i>
* <p>
* If for some reason processor's scheduled state can not be transitioned to
* STOPPING (e.g., the processor didn't finish @OnScheduled operation when
* stop was called), the attempt will be made to transition processor's
* scheduled state from STARTING to STOPPING which will allow
* {@link #start(ScheduledExecutorService, long, ProcessContext, Runnable)}
* method to initiate processor's shutdown upon exiting @OnScheduled
* operation, otherwise the processor's scheduled state will remain
* unchanged ensuring that multiple calls to this method are idempotent.
* </p>
*/
@Override
public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler,
final T processContext, final Callable<Boolean> activeThreadMonitorCallback) {
LOG.info("Stopping processor: " + this.processor.getClass());
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
// will continue to monitor active threads, invoking OnStopped once
// there are none
scheduler.execute(new Runnable() {
boolean unscheduled = false;
@Override
public void run() {
if (!this.unscheduled){
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
this.unscheduled = true;
}
try {
if (activeThreadMonitorCallback.call()) {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
} else {
scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
}
} catch (final Exception e) {
LOG.warn("Failed while shutting down processor " + processor, e);
}
}
});
} else {
/*
* We do compareAndSet() instead of set() to ensure that Processor
* stoppage is handled consistently including a condition where
* Processor never got a chance to transition to RUNNING state
* before stop() was called. If that happens the stop processor
* routine will be initiated in start() method, otherwise the IF
* part will handle the stop processor routine.
*/
this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING);
}
}
/**
* Will invoke lifecycle operation (OnScheduled or OnUnscheduled)
* asynchronously to ensure that it could be interrupted if stop action was
* initiated on the processor that may be infinitely blocking in such
* operation. While this approach paves the way for further enhancements
* related to managing processor'slife-cycle operation at the moment the
* interrupt will not happen automatically. This is primarily to preserve
* the existing behavior of the NiFi where stop operation can only be
* invoked once the processor is started. Unfortunately that could mean that
* the processor may be blocking indefinitely in lifecycle operation
* (OnScheduled or OnUnscheduled). To deal with that a new NiFi property has
* been introduced <i>nifi.processor.scheduling.timeout</i> which allows one
* to set the time (in milliseconds) of how long to wait before canceling
* such lifecycle operation (OnScheduled or OnUnscheduled) allowing
* processor's stop sequence to proceed. The default value for this property
* is {@link Long#MAX_VALUE}.
* <p>
* NOTE: Canceling the task does not guarantee that the task will actually
* completes (successfully or otherwise), since cancellation of the task
* will issue a simple Thread.interrupt(). However code inside of lifecycle
* operation (OnScheduled or OnUnscheduled) is written purely and will
* ignore thread interrupts you may end up with runaway thread which may
* eventually require NiFi reboot. In any event, the above explanation will
* be logged (WARN) informing a user so further actions could be taken.
* </p>
*/
private <T> void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable<T> task) {
final String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
final long onScheduleTimeout = timeoutString == null ? 60000
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
final Future<?> taskFuture = callback.invokeMonitoringTask(task);
try {
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName()
+ "' lifecycle OnScheduled operation to finish.");
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e);
} catch (final TimeoutException e) {
taskFuture.cancel(true);
LOG.warn("Timed out while waiting for OnScheduled of '"
+ this.processor.getClass().getSimpleName()
+ "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not "
+ "guarantee that the task will be canceled since the code inside current OnScheduled operation may "
+ "have been written to ignore interrupts which may result in runaway thread which could lead to more issues "
+ "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '"
+ this.processor + "' that needs to be documented, reported and eventually fixed.");
throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e);
} catch (final ExecutionException e){
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
} finally {
callback.postMonitor();
}
}
@Override
protected String getProcessGroupIdentifier() {
final ProcessGroup group = getProcessGroup();
return group == null ? null : group.getIdentifier();
}
}