| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller; |
| |
| import org.apache.commons.lang3.builder.EqualsBuilder; |
| import org.apache.commons.lang3.builder.HashCodeBuilder; |
| import org.apache.nifi.annotation.behavior.EventDriven; |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.SideEffectFree; |
| import org.apache.nifi.annotation.behavior.SupportsBatching; |
| import org.apache.nifi.annotation.behavior.TriggerSerially; |
| import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; |
| import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.annotation.lifecycle.OnUnscheduled; |
| import org.apache.nifi.authorization.Resource; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.resource.ResourceFactory; |
| import org.apache.nifi.authorization.resource.ResourceType; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.connectable.Position; |
| import org.apache.nifi.controller.service.ControllerServiceNode; |
| import org.apache.nifi.controller.service.ControllerServiceProvider; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.logging.LogLevel; |
| import org.apache.nifi.logging.LogRepositoryFactory; |
| import org.apache.nifi.nar.NarCloseable; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSessionFactory; |
| import org.apache.nifi.processor.Processor; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.SimpleProcessLogger; |
| import org.apache.nifi.scheduling.SchedulingStrategy; |
| import org.apache.nifi.util.FormatUtils; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.apache.nifi.util.ReflectionUtils; |
| import org.quartz.CronExpression; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.util.Assert; |
| |
| import java.lang.reflect.InvocationTargetException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| /** |
| * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists |
| * within a controlled flow. This node keeps track of the processor, its |
| * scheduling information and its relationships to other processors and whatever |
| * scheduled futures exist for it. Must be thread safe. |
| * |
| */ |
| public class StandardProcessorNode extends ProcessorNode implements Connectable { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(StandardProcessorNode.class); |
| |
| public static final String BULLETIN_OBSERVER_ID = "bulletin-observer"; |
| |
| public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; |
| public static final String DEFAULT_YIELD_PERIOD = "1 sec"; |
| public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; |
| private final AtomicReference<ProcessGroup> processGroup; |
| private final Processor processor; |
| private final AtomicReference<String> identifier; |
| private final Map<Connection, Connectable> destinations; |
| private final Map<Relationship, Set<Connection>> connections; |
| private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; |
| private final AtomicReference<List<Connection>> incomingConnectionsRef; |
| private final AtomicBoolean isolated; |
| private final AtomicBoolean lossTolerant; |
| private final AtomicReference<String> comments; |
| private final AtomicReference<Position> position; |
| private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it |
| private final AtomicReference<String> yieldPeriod; |
| private final AtomicReference<String> penalizationPeriod; |
| private final AtomicReference<Map<String, String>> style; |
| private final AtomicInteger concurrentTaskCount; |
| private final AtomicLong yieldExpiration; |
| private final AtomicLong schedulingNanos; |
| private final boolean triggerWhenEmpty; |
| private final boolean sideEffectFree; |
| private final boolean triggeredSerially; |
| private final boolean triggerWhenAnyDestinationAvailable; |
| private final boolean eventDrivenSupported; |
| private final boolean batchSupported; |
| private final Requirement inputRequirement; |
| private final ProcessScheduler processScheduler; |
| private long runNanos = 0L; |
| |
| private SchedulingStrategy schedulingStrategy; // guarded by read/write lock |
| // ??????? NOT any more |
| |
| public StandardProcessorNode(final Processor processor, final String uuid, |
| final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, |
| final ControllerServiceProvider controllerServiceProvider) { |
| |
| this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, |
| processor.getClass().getSimpleName(), processor.getClass().getCanonicalName()); |
| } |
| |
| public StandardProcessorNode(final Processor processor, final String uuid, |
| final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, |
| final ControllerServiceProvider controllerServiceProvider, |
| final String componentType, final String componentCanonicalClass) { |
| |
| super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass); |
| |
| this.processor = processor; |
| identifier = new AtomicReference<>(uuid); |
| destinations = new HashMap<>(); |
| connections = new HashMap<>(); |
| incomingConnectionsRef = new AtomicReference<>(new ArrayList<>()); |
| lossTolerant = new AtomicBoolean(false); |
| final Set<Relationship> emptySetOfRelationships = new HashSet<>(); |
| undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); |
| comments = new AtomicReference<>(""); |
| schedulingPeriod = new AtomicReference<>("0 sec"); |
| schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); |
| yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD); |
| yieldExpiration = new AtomicLong(0L); |
| concurrentTaskCount = new AtomicInteger(1); |
| position = new AtomicReference<>(new Position(0D, 0D)); |
| style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>())); |
| this.processGroup = new AtomicReference<>(); |
| processScheduler = scheduler; |
| isolated = new AtomicBoolean(false); |
| penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); |
| |
| final Class<?> procClass = processor.getClass(); |
| triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class); |
| sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class); |
| batchSupported = procClass.isAnnotationPresent(SupportsBatching.class); |
| triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class); |
| triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); |
| eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; |
| |
| final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class); |
| if (inputRequirementPresent) { |
| inputRequirement = procClass.getAnnotation(InputRequirement.class).value(); |
| } else { |
| inputRequirement = Requirement.INPUT_ALLOWED; |
| } |
| |
| schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; |
| } |
| |
| /** |
| * @return comments about this specific processor instance |
| */ |
| @Override |
| public String getComments() { |
| return comments.get(); |
| } |
| |
| @Override |
| public Authorizable getParentAuthorizable() { |
| return getProcessGroup(); |
| } |
| |
| @Override |
| public Resource getResource() { |
| return ResourceFactory.getComponentResource(ResourceType.Processor, getIdentifier(), getName()); |
| } |
| |
| /** |
| * Provides and opportunity to retain information about this particular |
| * processor instance |
| * |
| * @param comments |
| * new comments |
| */ |
| @Override |
| public void setComments(final String comments) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| this.comments.set(comments); |
| } |
| |
| @Override |
| public Position getPosition() { |
| return position.get(); |
| } |
| |
| @Override |
| public void setPosition(final Position position) { |
| this.position.set(position); |
| } |
| |
| @Override |
| public Map<String, String> getStyle() { |
| return style.get(); |
| } |
| |
| @Override |
| public void setStyle(final Map<String, String> style) { |
| if (style != null) { |
| this.style.set(Collections.unmodifiableMap(new HashMap<>(style))); |
| } |
| } |
| |
| @Override |
| public String getIdentifier() { |
| return identifier.get(); |
| } |
| |
| /** |
| * @return if true flow file content generated by this processor is |
| * considered loss tolerant |
| */ |
| @Override |
| public boolean isLossTolerant() { |
| return lossTolerant.get(); |
| } |
| |
| @Override |
| public boolean isIsolated() { |
| return isolated.get(); |
| } |
| |
| /** |
| * @return true if the processor has the {@link TriggerWhenEmpty} |
| * annotation, false otherwise. |
| */ |
| @Override |
| public boolean isTriggerWhenEmpty() { |
| return triggerWhenEmpty; |
| } |
| |
| /** |
| * @return true if the processor has the {@link SideEffectFree} annotation, |
| * false otherwise. |
| */ |
| @Override |
| public boolean isSideEffectFree() { |
| return sideEffectFree; |
| } |
| |
| @Override |
| public boolean isHighThroughputSupported() { |
| return batchSupported; |
| } |
| |
| /** |
| * @return true if the processor has the |
| * {@link TriggerWhenAnyDestinationAvailable} annotation, false |
| * otherwise. |
| */ |
| @Override |
| public boolean isTriggerWhenAnyDestinationAvailable() { |
| return triggerWhenAnyDestinationAvailable; |
| } |
| |
| /** |
| * Indicates whether flow file content made by this processor must be |
| * persisted |
| * |
| * @param lossTolerant |
| * tolerant |
| */ |
| @Override |
| public void setLossTolerant(final boolean lossTolerant) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| this.lossTolerant.set(lossTolerant); |
| } |
| |
| /** |
| * Indicates whether the processor runs on only the primary node. |
| * |
| * @param isolated |
| * isolated |
| */ |
| public void setIsolated(final boolean isolated) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| this.isolated.set(isolated); |
| } |
| |
| @Override |
| public boolean isAutoTerminated(final Relationship relationship) { |
| if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) { |
| return true; |
| } |
| final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get(); |
| return terminatable == null ? false : terminatable.contains(relationship); |
| } |
| |
| @Override |
| public void setAutoTerminatedRelationships(final Set<Relationship> terminate) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| for (final Relationship rel : terminate) { |
| if (!getConnections(rel).isEmpty()) { |
| throw new IllegalStateException("Cannot mark relationship '" + rel.getName() |
| + "' as auto-terminated because Connection already exists with this relationship"); |
| } |
| } |
| undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); |
| } |
| |
| /** |
| * @return an unmodifiable Set that contains all of the |
| * ProcessorRelationship objects that are configured to be |
| * auto-terminated |
| */ |
| @Override |
| public Set<Relationship> getAutoTerminatedRelationships() { |
| Set<Relationship> relationships = undefinedRelationshipsToTerminate.get(); |
| if (relationships == null) { |
| relationships = new HashSet<>(); |
| } |
| return Collections.unmodifiableSet(relationships); |
| } |
| |
| /** |
| * @return the value of the processor's {@link CapabilityDescription} |
| * annotation, if one exists, else <code>null</code>. |
| */ |
| public String getProcessorDescription() { |
| final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); |
| String description = null; |
| if (capDesc != null) { |
| description = capDesc.value(); |
| } |
| return description; |
| } |
| |
| @Override |
| public void setName(final String name) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| super.setName(name); |
| } |
| |
| /** |
| * @param timeUnit |
| * determines the unit of time to represent the scheduling |
| * period. If null will be reported in units of |
| * {@link #DEFAULT_SCHEDULING_TIME_UNIT} |
| * @return the schedule period that should elapse before subsequent cycles |
| * of this processor's tasks |
| */ |
| @Override |
| public long getSchedulingPeriod(final TimeUnit timeUnit) { |
| return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); |
| } |
| |
| @Override |
| public boolean isEventDrivenSupported() { |
| return this.eventDrivenSupported; |
| } |
| |
| /** |
| * Updates the Scheduling Strategy used for this Processor |
| * |
| * @param schedulingStrategy |
| * strategy |
| * |
| * @throws IllegalArgumentException |
| * if the SchedulingStrategy is not not applicable for this |
| * Processor |
| */ |
| @Override |
| public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { |
| if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) { |
| // not valid. Just ignore it. We don't throw an Exception because if |
| // a developer changes a Processor so that |
| // it no longer supports EventDriven mode, we don't want the app to |
| // fail to startup if it was already in Event-Driven |
| // Mode. Instead, we will simply leave it in Timer-Driven mode |
| return; |
| } |
| |
| this.schedulingStrategy = schedulingStrategy; |
| setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); |
| } |
| |
| /** |
| * @return the currently configured scheduling strategy |
| */ |
| @Override |
| public SchedulingStrategy getSchedulingStrategy() { |
| return this.schedulingStrategy; |
| } |
| |
| @Override |
| public String getSchedulingPeriod() { |
| return schedulingPeriod.get(); |
| } |
| |
| @Override |
| public void setScheduldingPeriod(final String schedulingPeriod) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| switch (schedulingStrategy) { |
| case CRON_DRIVEN: { |
| try { |
| new CronExpression(schedulingPeriod); |
| } catch (final Exception e) { |
| throw new IllegalArgumentException( |
| "Scheduling Period is not a valid cron expression: " + schedulingPeriod); |
| } |
| } |
| break; |
| case PRIMARY_NODE_ONLY: |
| case TIMER_DRIVEN: { |
| final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), |
| TimeUnit.NANOSECONDS); |
| if (schedulingNanos < 0) { |
| throw new IllegalArgumentException("Scheduling Period must be positive"); |
| } |
| this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); |
| } |
| break; |
| case EVENT_DRIVEN: |
| default: |
| return; |
| } |
| |
| this.schedulingPeriod.set(schedulingPeriod); |
| } |
| |
| @Override |
| public long getRunDuration(final TimeUnit timeUnit) { |
| return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); |
| } |
| |
| @Override |
| public void setRunDuration(final long duration, final TimeUnit timeUnit) { |
| if (duration < 0) { |
| throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " |
| + timeUnit.toSeconds(duration) + " seconds"); |
| } |
| |
| this.runNanos = timeUnit.toNanos(duration); |
| } |
| |
| @Override |
| public long getYieldPeriod(final TimeUnit timeUnit) { |
| return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); |
| } |
| |
| @Override |
| public String getYieldPeriod() { |
| return yieldPeriod.get(); |
| } |
| |
| @Override |
| public void setYieldPeriod(final String yieldPeriod) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); |
| if (yieldMillis < 0) { |
| throw new IllegalArgumentException("Yield duration must be positive"); |
| } |
| this.yieldPeriod.set(yieldPeriod); |
| } |
| |
| /** |
| * Causes the processor not to be scheduled for some period of time. This |
| * duration can be obtained and set via the |
| * {@link #getYieldPeriod(TimeUnit)} and |
| * {@link #setYieldPeriod(long, TimeUnit)} methods. |
| */ |
| @Override |
| public void yield() { |
| final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); |
| yield(yieldMillis, TimeUnit.MILLISECONDS); |
| |
| final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" |
| : yieldMillis + " milliseconds"; |
| LoggerFactory.getLogger(processor.getClass()).debug( |
| "{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, |
| yieldDuration); |
| } |
| |
| @Override |
| public void yield(final long period, final TimeUnit timeUnit) { |
| final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit); |
| yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); |
| |
| processScheduler.yield(this); |
| } |
| |
| /** |
| * @return the number of milliseconds since Epoch at which time this |
| * processor is to once again be scheduled. |
| */ |
| @Override |
| public long getYieldExpiration() { |
| return yieldExpiration.get(); |
| } |
| |
| @Override |
| public long getPenalizationPeriod(final TimeUnit timeUnit) { |
| return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); |
| } |
| |
| @Override |
| public String getPenalizationPeriod() { |
| return penalizationPeriod.get(); |
| } |
| |
| @Override |
| public void setPenalizationPeriod(final String penalizationPeriod) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), |
| TimeUnit.MILLISECONDS); |
| if (penalizationMillis < 0) { |
| throw new IllegalArgumentException("Penalization duration must be positive"); |
| } |
| this.penalizationPeriod.set(penalizationPeriod); |
| } |
| |
| /** |
| * Determines the number of concurrent tasks that may be running for this |
| * processor. |
| * |
| * @param taskCount |
| * a number of concurrent tasks this processor may have running |
| * @throws IllegalArgumentException |
| * if the given value is less than 1 |
| */ |
| @Override |
| public void setMaxConcurrentTasks(final int taskCount) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { |
| throw new IllegalArgumentException(); |
| } |
| if (!triggeredSerially) { |
| concurrentTaskCount.set(taskCount); |
| } |
| } |
| |
| @Override |
| public boolean isTriggeredSerially() { |
| return triggeredSerially; |
| } |
| |
| /** |
| * @return the number of tasks that may execute concurrently for this |
| * processor |
| */ |
| @Override |
| public int getMaxConcurrentTasks() { |
| return concurrentTaskCount.get(); |
| } |
| |
| @Override |
| public LogLevel getBulletinLevel() { |
| return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID); |
| } |
| |
| @Override |
| public void setBulletinLevel(final LogLevel level) { |
| LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level); |
| } |
| |
| @Override |
| public Set<Connection> getConnections() { |
| final Set<Connection> allConnections = new HashSet<>(); |
| for (final Set<Connection> connectionSet : connections.values()) { |
| allConnections.addAll(connectionSet); |
| } |
| |
| return allConnections; |
| } |
| |
| @Override |
| public List<Connection> getIncomingConnections() { |
| return incomingConnectionsRef.get(); |
| } |
| |
| @Override |
| public Set<Connection> getConnections(final Relationship relationship) { |
| final Set<Connection> applicableConnections = connections.get(relationship); |
| return (applicableConnections == null) ? Collections.<Connection> emptySet() |
| : Collections.unmodifiableSet(applicableConnections); |
| } |
| |
| @Override |
| public void addConnection(final Connection connection) { |
| Objects.requireNonNull(connection, "connection cannot be null"); |
| |
| if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) { |
| throw new IllegalStateException( |
| "Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); |
| } |
| |
| List<Connection> updatedIncoming = null; |
| if (connection.getDestination().equals(this)) { |
| // don't add the connection twice. This may occur if we have a |
| // self-loop because we will be told |
| // to add the connection once because we are the source and again |
| // because we are the destination. |
| final List<Connection> incomingConnections = incomingConnectionsRef.get(); |
| updatedIncoming = new ArrayList<>(incomingConnections); |
| if (!updatedIncoming.contains(connection)) { |
| updatedIncoming.add(connection); |
| } |
| } |
| |
| if (connection.getSource().equals(this)) { |
| // don't add the connection twice. This may occur if we have a |
| // self-loop because we will be told |
| // to add the connection once because we are the source and again |
| // because we are the destination. |
| if (!destinations.containsKey(connection)) { |
| for (final Relationship relationship : connection.getRelationships()) { |
| final Relationship rel = getRelationship(relationship.getName()); |
| Set<Connection> set = connections.get(rel); |
| if (set == null) { |
| set = new HashSet<>(); |
| connections.put(rel, set); |
| } |
| |
| set.add(connection); |
| |
| destinations.put(connection, connection.getDestination()); |
| } |
| |
| final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); |
| if (autoTerminated != null) { |
| autoTerminated.removeAll(connection.getRelationships()); |
| this.undefinedRelationshipsToTerminate.set(autoTerminated); |
| } |
| } |
| } |
| |
| if (updatedIncoming != null) { |
| incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); |
| } |
| } |
| |
| @Override |
| public boolean hasIncomingConnection() { |
| return !incomingConnectionsRef.get().isEmpty(); |
| } |
| |
| @Override |
| public void updateConnection(final Connection connection) throws IllegalStateException { |
| if (requireNonNull(connection).getSource().equals(this)) { |
| // update any relationships |
| // |
| // first check if any relations were removed. |
| final List<Relationship> existingRelationships = new ArrayList<>(); |
| for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { |
| if (entry.getValue().contains(connection)) { |
| existingRelationships.add(entry.getKey()); |
| } |
| } |
| |
| for (final Relationship rel : connection.getRelationships()) { |
| if (!existingRelationships.contains(rel)) { |
| // relationship was removed. Check if this is legal. |
| final Set<Connection> connectionsForRelationship = getConnections(rel); |
| if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() |
| && !isAutoTerminated(rel) && getRelationships().contains(rel)) { |
| // if we are running and we do not terminate undefined |
| // relationships and this is the only |
| // connection that defines the given relationship, and |
| // that relationship is required, |
| // then it is not legal to remove this relationship from |
| // this connection. |
| throw new IllegalStateException("Cannot remove relationship " + rel.getName() |
| + " from Connection because doing so would invalidate Processor " + this |
| + ", which is currently running"); |
| } |
| } |
| } |
| |
| // remove the connection from any list that currently contains |
| for (final Set<Connection> list : connections.values()) { |
| list.remove(connection); |
| } |
| |
| // add the connection in for all relationships listed. |
| for (final Relationship rel : connection.getRelationships()) { |
| Set<Connection> set = connections.get(rel); |
| if (set == null) { |
| set = new HashSet<>(); |
| connections.put(rel, set); |
| } |
| set.add(connection); |
| } |
| |
| // update to the new destination |
| destinations.put(connection, connection.getDestination()); |
| |
| final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); |
| if (autoTerminated != null) { |
| autoTerminated.removeAll(connection.getRelationships()); |
| this.undefinedRelationshipsToTerminate.set(autoTerminated); |
| } |
| } |
| |
| if (connection.getDestination().equals(this)) { |
| // update our incoming connections -- we can just remove & re-add |
| // the connection to update the list. |
| final List<Connection> incomingConnections = incomingConnectionsRef.get(); |
| final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); |
| updatedIncoming.remove(connection); |
| updatedIncoming.add(connection); |
| incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); |
| } |
| } |
| |
| @Override |
| public void removeConnection(final Connection connection) { |
| boolean connectionRemoved = false; |
| |
| if (requireNonNull(connection).getSource().equals(this)) { |
| for (final Relationship relationship : connection.getRelationships()) { |
| final Set<Connection> connectionsForRelationship = getConnections(relationship); |
| if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) { |
| throw new IllegalStateException( |
| "This connection cannot be removed because its source is running and removing it will invalidate this processor"); |
| } |
| } |
| |
| for (final Set<Connection> connectionList : this.connections.values()) { |
| connectionList.remove(connection); |
| } |
| |
| connectionRemoved = (destinations.remove(connection) != null); |
| } |
| |
| if (connection.getDestination().equals(this)) { |
| final List<Connection> incomingConnections = incomingConnectionsRef.get(); |
| if (incomingConnections.contains(connection)) { |
| final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); |
| updatedIncoming.remove(connection); |
| incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); |
| return; |
| } |
| } |
| |
| if (!connectionRemoved) { |
| throw new IllegalArgumentException( |
| "Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); |
| } |
| } |
| |
| /** |
| * @param relationshipName |
| * name |
| * @return the relationship for this nodes processor for the given name or |
| * creates a new relationship for the given name |
| */ |
| @Override |
| public Relationship getRelationship(final String relationshipName) { |
| final Relationship specRel = new Relationship.Builder().name(relationshipName).build(); |
| Relationship returnRel = specRel; |
| |
| final Set<Relationship> relationships; |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| relationships = processor.getRelationships(); |
| } |
| |
| for (final Relationship rel : relationships) { |
| if (rel.equals(specRel)) { |
| returnRel = rel; |
| break; |
| } |
| } |
| return returnRel; |
| } |
| |
| @Override |
| public Processor getProcessor() { |
| return this.processor; |
| } |
| |
| /** |
| * @return the Set of destination processors for all relationships excluding |
| * any destinations that are this processor itself (self-loops) |
| */ |
| public Set<Connectable> getDestinations() { |
| final Set<Connectable> nonSelfDestinations = new HashSet<>(); |
| for (final Connectable connectable : destinations.values()) { |
| if (connectable != this) { |
| nonSelfDestinations.add(connectable); |
| } |
| } |
| return nonSelfDestinations; |
| } |
| |
| public Set<Connectable> getDestinations(final Relationship relationship) { |
| final Set<Connectable> destinationSet = new HashSet<>(); |
| final Set<Connection> relationshipConnections = connections.get(relationship); |
| if (relationshipConnections != null) { |
| for (final Connection connection : relationshipConnections) { |
| destinationSet.add(destinations.get(connection)); |
| } |
| } |
| return destinationSet; |
| } |
| |
| public Set<Relationship> getUndefinedRelationships() { |
| final Set<Relationship> undefined = new HashSet<>(); |
| final Set<Relationship> relationships; |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| relationships = processor.getRelationships(); |
| } |
| |
| if (relationships == null) { |
| return undefined; |
| } |
| for (final Relationship relation : relationships) { |
| final Set<Connection> connectionSet = this.connections.get(relation); |
| if (connectionSet == null || connectionSet.isEmpty()) { |
| undefined.add(relation); |
| } |
| } |
| return undefined; |
| } |
| |
| /** |
| * Determines if the given node is a destination for this node |
| * |
| * @param node |
| * node |
| * @return true if is a direct destination node; false otherwise |
| */ |
| boolean isRelated(final ProcessorNode node) { |
| return this.destinations.containsValue(node); |
| } |
| |
| @Override |
| public boolean isRunning() { |
| return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; |
| } |
| |
| @Override |
| public int getActiveThreadCount() { |
| return processScheduler.getActiveThreadCount(this); |
| } |
| |
| List<Connection> getIncomingNonLoopConnections() { |
| final List<Connection> connections = getIncomingConnections(); |
| final List<Connection> nonLoopConnections = new ArrayList<>(connections.size()); |
| for (final Connection connection : connections) { |
| if (!connection.getSource().equals(this)) { |
| nonLoopConnections.add(connection); |
| } |
| } |
| |
| return nonLoopConnections; |
| } |
| |
| @Override |
| public boolean isValid() { |
| try { |
| final ValidationContext validationContext = this.getValidationContextFactory() |
| .newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); |
| |
| final Collection<ValidationResult> validationResults; |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| validationResults = getProcessor().validate(validationContext); |
| } |
| |
| for (final ValidationResult result : validationResults) { |
| if (!result.isValid()) { |
| return false; |
| } |
| } |
| |
| for (final Relationship undef : getUndefinedRelationships()) { |
| if (!isAutoTerminated(undef)) { |
| return false; |
| } |
| } |
| |
| switch (getInputRequirement()) { |
| case INPUT_ALLOWED: |
| break; |
| case INPUT_FORBIDDEN: { |
| if (!getIncomingNonLoopConnections().isEmpty()) { |
| return false; |
| } |
| break; |
| } |
| case INPUT_REQUIRED: { |
| if (getIncomingNonLoopConnections().isEmpty()) { |
| return false; |
| } |
| break; |
| } |
| } |
| } catch (final Throwable t) { |
| LOG.warn("Failed during validation", t); |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public Collection<ValidationResult> getValidationErrors() { |
| final List<ValidationResult> results = new ArrayList<>(); |
| try { |
| final ValidationContext validationContext = this.getValidationContextFactory() |
| .newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier()); |
| |
| final Collection<ValidationResult> validationResults; |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| validationResults = getProcessor().validate(validationContext); |
| } |
| |
| for (final ValidationResult result : validationResults) { |
| if (!result.isValid()) { |
| results.add(result); |
| } |
| } |
| |
| for (final Relationship relationship : getUndefinedRelationships()) { |
| if (!isAutoTerminated(relationship)) { |
| final ValidationResult error = new ValidationResult.Builder() |
| .explanation("Relationship '" + relationship.getName() |
| + "' is not connected to any component and is not auto-terminated") |
| .subject("Relationship " + relationship.getName()).valid(false).build(); |
| results.add(error); |
| } |
| } |
| |
| switch (getInputRequirement()) { |
| case INPUT_ALLOWED: |
| break; |
| case INPUT_FORBIDDEN: { |
| final int incomingConnCount = getIncomingNonLoopConnections().size(); |
| if (incomingConnCount != 0) { |
| results.add(new ValidationResult.Builder().explanation( |
| "Processor does not allow upstream connections but currently has " + incomingConnCount) |
| .subject("Upstream Connections").valid(false).build()); |
| } |
| break; |
| } |
| case INPUT_REQUIRED: { |
| if (getIncomingNonLoopConnections().isEmpty()) { |
| results.add(new ValidationResult.Builder() |
| .explanation("Processor requires an upstream connection but currently has none") |
| .subject("Upstream Connections").valid(false).build()); |
| } |
| break; |
| } |
| } |
| } catch (final Throwable t) { |
| results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()) |
| .valid(false).build()); |
| } |
| return results; |
| } |
| |
| @Override |
| public Requirement getInputRequirement() { |
| return inputRequirement; |
| } |
| |
| /** |
| * Establishes node equality (based on the processor's identifier) |
| * |
| * @param other |
| * node |
| * @return true if equal |
| */ |
| @Override |
| public boolean equals(final Object other) { |
| if (!(other instanceof ProcessorNode)) { |
| return false; |
| } |
| final ProcessorNode on = (ProcessorNode) other; |
| return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return new HashCodeBuilder(7, 67).append(identifier).toHashCode(); |
| } |
| |
| @Override |
| public Collection<Relationship> getRelationships() { |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| return getProcessor().getRelationships(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| return getProcessor().toString(); |
| } |
| } |
| |
| @Override |
| public ProcessGroup getProcessGroup() { |
| return processGroup.get(); |
| } |
| |
| @Override |
| public void setProcessGroup(final ProcessGroup group) { |
| this.processGroup.set(group); |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { |
| try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { |
| processor.onTrigger(context, sessionFactory); |
| } |
| } |
| |
| @Override |
| public ConnectableType getConnectableType() { |
| return ConnectableType.PROCESSOR; |
| } |
| |
| @Override |
| public void setAnnotationData(final String data) { |
| Assert.state(!isRunning(), "Cannot set AnnotationData while processor is running"); |
| super.setAnnotationData(data); |
| } |
| |
| @Override |
| public Collection<ValidationResult> validate(final ValidationContext validationContext) { |
| return getValidationErrors(); |
| } |
| |
| @Override |
| public void verifyCanDelete() throws IllegalStateException { |
| verifyCanDelete(false); |
| } |
| |
| @Override |
| public void verifyCanDelete(final boolean ignoreConnections) { |
| if (isRunning()) { |
| throw new IllegalStateException(this.getIdentifier() + " is running"); |
| } |
| |
| if (!ignoreConnections) { |
| for (final Set<Connection> connectionSet : connections.values()) { |
| for (final Connection connection : connectionSet) { |
| connection.verifyCanDelete(); |
| } |
| } |
| |
| for (final Connection connection : incomingConnectionsRef.get()) { |
| if (connection.getSource().equals(this)) { |
| connection.verifyCanDelete(); |
| } else { |
| throw new IllegalStateException(this.getIdentifier() + " is the destination of another component"); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void verifyCanStart() { |
| this.verifyCanStart(null); |
| } |
| |
| @Override |
| public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { |
| final ScheduledState currentState = getPhysicalScheduledState(); |
| if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) { |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not stopped. Current state is " + currentState.name()); |
| } |
| |
| verifyNoActiveThreads(); |
| |
| if (ignoredReferences != null) { |
| final Set<String> ids = new HashSet<>(); |
| for (final ControllerServiceNode node : ignoredReferences) { |
| ids.add(node.getIdentifier()); |
| } |
| |
| final Collection<ValidationResult> validationResults = getValidationErrors(ids); |
| for (final ValidationResult result : validationResults) { |
| if (!result.isValid()) { |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result); |
| } |
| } |
| } else { |
| if (!isValid()) { |
| throw new IllegalStateException(this.getIdentifier() + " is not in a valid state"); |
| } |
| } |
| } |
| |
| @Override |
| public void verifyCanStop() { |
| if (getScheduledState() != ScheduledState.RUNNING) { |
| throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run"); |
| } |
| } |
| |
| @Override |
| public void verifyCanUpdate() { |
| if (isRunning()) { |
| throw new IllegalStateException(this.getIdentifier() + " is not stopped"); |
| } |
| } |
| |
| @Override |
| public void verifyCanEnable() { |
| if (getScheduledState() != ScheduledState.DISABLED) { |
| throw new IllegalStateException(this.getIdentifier() + " is not disabled"); |
| } |
| |
| verifyNoActiveThreads(); |
| } |
| |
| @Override |
| public void verifyCanDisable() { |
| if (getScheduledState() != ScheduledState.STOPPED) { |
| throw new IllegalStateException(this.getIdentifier() + " is not stopped"); |
| } |
| verifyNoActiveThreads(); |
| } |
| |
| @Override |
| public void verifyCanClearState() throws IllegalStateException { |
| verifyCanUpdate(); |
| } |
| |
| private void verifyNoActiveThreads() throws IllegalStateException { |
| final int threadCount = processScheduler.getActiveThreadCount(this); |
| if (threadCount > 0) { |
| throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active"); |
| } |
| } |
| |
| @Override |
| public void verifyModifiable() throws IllegalStateException { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| } |
| |
| /** |
| * Will idempotently start the processor using the following sequence: <i> |
| * <ul> |
| * <li>Validate Processor's state (e.g., PropertyDescriptors, |
| * ControllerServices etc.)</li> |
| * <li>Transition (atomically) Processor's scheduled state form STOPPED to |
| * STARTING. If the above state transition succeeds, then execute the start |
| * task (asynchronously) which will be re-tried until @OnScheduled is |
| * executed successfully and "schedulingAgentCallback' is invoked, or until |
| * STOP operation is initiated on this processor. If state transition fails |
| * it means processor is already being started and WARN message will be |
| * logged explaining it.</li> |
| * </ul> |
| * </i> |
| * <p> |
| * Any exception thrown while invoking operations annotated with @OnSchedule |
| * will be caught and logged after which @OnUnscheduled operation will be |
| * invoked (quietly) and the start sequence will be repeated (re-try) after |
| * delay provided by 'administrativeYieldMillis'. |
| * </p> |
| * <p> |
| * Upon successful completion of start sequence (@OnScheduled -> |
| * 'schedulingAgentCallback') the attempt will be made to transition |
| * processor's scheduling state to RUNNING at which point processor is |
| * considered to be fully started and functioning. If upon successful |
| * invocation of @OnScheduled operation the processor can not be |
| * transitioned to RUNNING state (e.g., STOP operation was invoked on the |
| * processor while it's @OnScheduled operation was executing), the |
| * processor's @OnUnscheduled operation will be invoked and its scheduling |
| * state will be set to STOPPED at which point the processor is considered |
| * to be fully stopped. |
| * </p> |
| */ |
| @Override |
| public <T extends ProcessContext & ControllerServiceLookup> void start(final ScheduledExecutorService taskScheduler, |
| final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) { |
| if (!this.isValid()) { |
| throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); |
| } |
| final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); |
| if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING)) { // will ensure that the Processor represented by this node can only be started once |
| final Runnable startProcRunnable = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| try (final NarCloseable nc = NarCloseable.withNarLoader()) { |
| ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); |
| return null; |
| } |
| } |
| }); |
| |
| if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { |
| schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle |
| } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state |
| try (final NarCloseable nc = NarCloseable.withNarLoader()) { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); |
| } |
| scheduledState.set(ScheduledState.STOPPED); |
| } |
| } catch (final Exception e) { |
| final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; |
| procLog.error( "{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}", |
| new Object[] { StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " milliseconds" }, cause); |
| LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); |
| |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); |
| if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated |
| taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); |
| } else { |
| scheduledState.set(ScheduledState.STOPPED); |
| } |
| } |
| } |
| }; |
| taskScheduler.execute(startProcRunnable); |
| } else { |
| final String procName = this.processor.getClass().getSimpleName(); |
| LOG.warn("Can not start '" + procName |
| + "' since it's already in the process of being started or it is DISABLED - " |
| + scheduledState.get()); |
| procLog.warn("Can not start '" + procName |
| + "' since it's already in the process of being started or it is DISABLED - " |
| + scheduledState.get()); |
| } |
| } |
| |
| /** |
| * Will idempotently stop the processor using the following sequence: <i> |
| * <ul> |
| * <li>Transition (atomically) Processor's scheduled state form RUNNING to |
| * STOPPING. If the above state transition succeeds, then execute the stop |
| * task (asynchronously) where 'activeThreadMonitorCallback' provided by the |
| * {@link ProcessScheduler} will be called to check if this processor still |
| * has active threads. If it does, the task will be re-scheduled with delay |
| * of 100 milliseconds until there are no more active threads, at which |
| * point processor's @OnUnscheduled and @OnStopped operation will be invoked |
| * and its scheduled state is set to STOPPED which completes processor stop |
| * sequence.</li> |
| * </ul> |
| * </i> |
| * <p> |
| * If for some reason processor's scheduled state can not be transitioned to |
| * STOPPING (e.g., the processor didn't finish @OnScheduled operation when |
| * stop was called), the attempt will be made to transition processor's |
| * scheduled state from STARTING to STOPPING which will allow |
| * {@link #start(ScheduledExecutorService, long, ProcessContext, Runnable)} |
| * method to initiate processor's shutdown upon exiting @OnScheduled |
| * operation, otherwise the processor's scheduled state will remain |
| * unchanged ensuring that multiple calls to this method are idempotent. |
| * </p> |
| */ |
| @Override |
| public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler, |
| final T processContext, final Callable<Boolean> activeThreadMonitorCallback) { |
| LOG.info("Stopping processor: " + this.processor.getClass()); |
| if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once |
| // will continue to monitor active threads, invoking OnStopped once |
| // there are none |
| scheduler.execute(new Runnable() { |
| boolean unscheduled = false; |
| @Override |
| public void run() { |
| if (!this.unscheduled){ |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); |
| this.unscheduled = true; |
| } |
| try { |
| if (activeThreadMonitorCallback.call()) { |
| try (final NarCloseable nc = NarCloseable.withNarLoader()) { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); |
| } |
| scheduledState.set(ScheduledState.STOPPED); |
| } else { |
| scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); |
| } |
| } catch (final Exception e) { |
| LOG.warn("Failed while shutting down processor " + processor, e); |
| } |
| } |
| }); |
| } else { |
| /* |
| * We do compareAndSet() instead of set() to ensure that Processor |
| * stoppage is handled consistently including a condition where |
| * Processor never got a chance to transition to RUNNING state |
| * before stop() was called. If that happens the stop processor |
| * routine will be initiated in start() method, otherwise the IF |
| * part will handle the stop processor routine. |
| */ |
| this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING); |
| } |
| } |
| |
| /** |
| * Will invoke lifecycle operation (OnScheduled or OnUnscheduled) |
| * asynchronously to ensure that it could be interrupted if stop action was |
| * initiated on the processor that may be infinitely blocking in such |
| * operation. While this approach paves the way for further enhancements |
| * related to managing processor'slife-cycle operation at the moment the |
| * interrupt will not happen automatically. This is primarily to preserve |
| * the existing behavior of the NiFi where stop operation can only be |
| * invoked once the processor is started. Unfortunately that could mean that |
| * the processor may be blocking indefinitely in lifecycle operation |
| * (OnScheduled or OnUnscheduled). To deal with that a new NiFi property has |
| * been introduced <i>nifi.processor.scheduling.timeout</i> which allows one |
| * to set the time (in milliseconds) of how long to wait before canceling |
| * such lifecycle operation (OnScheduled or OnUnscheduled) allowing |
| * processor's stop sequence to proceed. The default value for this property |
| * is {@link Long#MAX_VALUE}. |
| * <p> |
| * NOTE: Canceling the task does not guarantee that the task will actually |
| * completes (successfully or otherwise), since cancellation of the task |
| * will issue a simple Thread.interrupt(). However code inside of lifecycle |
| * operation (OnScheduled or OnUnscheduled) is written purely and will |
| * ignore thread interrupts you may end up with runaway thread which may |
| * eventually require NiFi reboot. In any event, the above explanation will |
| * be logged (WARN) informing a user so further actions could be taken. |
| * </p> |
| */ |
| private <T> void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable<T> task) { |
| final String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); |
| final long onScheduleTimeout = timeoutString == null ? 60000 |
| : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); |
| final Future<?> taskFuture = callback.invokeMonitoringTask(task); |
| try { |
| taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); |
| } catch (final InterruptedException e) { |
| LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName() |
| + "' lifecycle OnScheduled operation to finish."); |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e); |
| } catch (final TimeoutException e) { |
| taskFuture.cancel(true); |
| LOG.warn("Timed out while waiting for OnScheduled of '" |
| + this.processor.getClass().getSimpleName() |
| + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " |
| + "guarantee that the task will be canceled since the code inside current OnScheduled operation may " |
| + "have been written to ignore interrupts which may result in runaway thread which could lead to more issues " |
| + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" |
| + this.processor + "' that needs to be documented, reported and eventually fixed."); |
| throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e); |
| } catch (final ExecutionException e){ |
| throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); |
| } finally { |
| callback.postMonitor(); |
| } |
| } |
| |
| @Override |
| protected String getProcessGroupIdentifier() { |
| final ProcessGroup group = getProcessGroup(); |
| return group == null ? null : group.getIdentifier(); |
| } |
| } |