| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller; |
| |
| import org.apache.commons.lang3.builder.EqualsBuilder; |
| import org.apache.commons.lang3.builder.HashCodeBuilder; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.Restricted; |
| import org.apache.nifi.annotation.behavior.SideEffectFree; |
| import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; |
| import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; |
| import org.apache.nifi.annotation.configuration.DefaultSchedule; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.DeprecationNotice; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.annotation.lifecycle.OnUnscheduled; |
| import org.apache.nifi.authorization.Resource; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.resource.ResourceFactory; |
| import org.apache.nifi.authorization.resource.ResourceType; |
| import org.apache.nifi.bundle.BundleCoordinate; |
| import org.apache.nifi.components.ConfigurableComponent; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.components.validation.ValidationState; |
| import org.apache.nifi.components.validation.ValidationStatus; |
| import org.apache.nifi.components.validation.ValidationTrigger; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.connectable.Position; |
| import org.apache.nifi.controller.exception.ProcessorInstantiationException; |
| import org.apache.nifi.controller.scheduling.LifecycleState; |
| import org.apache.nifi.controller.scheduling.SchedulingAgent; |
| import org.apache.nifi.controller.service.ControllerServiceNode; |
| import org.apache.nifi.controller.service.ControllerServiceProvider; |
| import org.apache.nifi.controller.tasks.ActiveTask; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.logging.LogLevel; |
| import org.apache.nifi.logging.LogRepositoryFactory; |
| import org.apache.nifi.nar.ExtensionManager; |
| import org.apache.nifi.nar.NarCloseable; |
| import org.apache.nifi.parameter.ParameterContext; |
| import org.apache.nifi.parameter.ParameterLookup; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSessionFactory; |
| import org.apache.nifi.processor.Processor; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.SimpleProcessLogger; |
| import org.apache.nifi.registry.ComponentVariableRegistry; |
| import org.apache.nifi.scheduling.ExecutionNode; |
| import org.apache.nifi.scheduling.SchedulingStrategy; |
| import org.apache.nifi.util.CharacterFilterUtils; |
| import org.apache.nifi.util.FormatUtils; |
| import org.apache.nifi.util.ReflectionUtils; |
| import org.apache.nifi.util.ThreadUtils; |
| import org.apache.nifi.util.file.classloader.ClassLoaderUtils; |
| import org.quartz.CronExpression; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.lang.management.ThreadInfo; |
| import java.lang.reflect.InvocationTargetException; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| /** |
| * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists |
| * within a controlled flow. This node keeps track of the processor, its |
| * scheduling information and its relationships to other processors and whatever |
| * scheduled futures exist for it. Must be thread safe. |
| * |
| */ |
| public class StandardProcessorNode extends ProcessorNode implements Connectable { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(StandardProcessorNode.class); |
| |
| public static final String BULLETIN_OBSERVER_ID = "bulletin-observer"; |
| |
| public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; |
| public static final String DEFAULT_YIELD_PERIOD = "1 sec"; |
| public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec"; |
| private final AtomicReference<ProcessGroup> processGroup; |
| private final AtomicReference<ProcessorDetails> processorRef; |
| private final AtomicReference<String> identifier; |
| private final Map<Connection, Connectable> destinations; |
| private final Map<Relationship, Set<Connection>> connections; |
| private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; |
| private final AtomicReference<List<Connection>> incomingConnections; |
| private final AtomicBoolean lossTolerant; |
| private final AtomicReference<String> comments; |
| private final AtomicReference<Position> position; |
| private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it |
| private final AtomicReference<String> yieldPeriod; |
| private final AtomicReference<String> penalizationPeriod; |
| private final AtomicReference<Map<String, String>> style; |
| private final AtomicInteger concurrentTaskCount; |
| private final AtomicLong yieldExpiration; |
| private final AtomicLong schedulingNanos; |
| private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); |
| private final ProcessScheduler processScheduler; |
| private long runNanos = 0L; |
| private volatile long yieldNanos; |
| private volatile ScheduledState desiredState = ScheduledState.STOPPED; |
| private volatile LogLevel bulletinLevel = LogLevel.WARN; |
| |
| private SchedulingStrategy schedulingStrategy; // guarded by synchronized keyword |
| private ExecutionNode executionNode; |
| private final Map<Thread, ActiveTask> activeThreads = new HashMap<>(48); |
| private final int hashCode; |
| private volatile boolean hasActiveThreads = false; |
| |
| public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, |
| final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, |
| final ControllerServiceProvider controllerServiceProvider, final ComponentVariableRegistry variableRegistry, |
| final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) { |
| |
| this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, processor.getComponent().getClass().getSimpleName(), |
| processor.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, extensionManager, validationTrigger, false); |
| } |
| |
| public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, |
| final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, |
| final ControllerServiceProvider controllerServiceProvider, |
| final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, |
| final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, |
| final boolean isExtensionMissing) { |
| |
| super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, |
| extensionManager, validationTrigger, isExtensionMissing); |
| |
| final ProcessorDetails processorDetails = new ProcessorDetails(processor); |
| this.processorRef = new AtomicReference<>(processorDetails); |
| |
| identifier = new AtomicReference<>(uuid); |
| destinations = new HashMap<>(); |
| connections = new HashMap<>(); |
| incomingConnections = new AtomicReference<>(new ArrayList<>()); |
| lossTolerant = new AtomicBoolean(false); |
| final Set<Relationship> emptySetOfRelationships = new HashSet<>(); |
| undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); |
| comments = new AtomicReference<>(""); |
| schedulingPeriod = new AtomicReference<>("0 sec"); |
| schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); |
| yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD); |
| yieldExpiration = new AtomicLong(0L); |
| concurrentTaskCount = new AtomicInteger(1); |
| position = new AtomicReference<>(new Position(0D, 0D)); |
| style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>())); |
| this.processGroup = new AtomicReference<>(); |
| processScheduler = scheduler; |
| penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); |
| |
| schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; |
| executionNode = isExecutionNodeRestricted() ? ExecutionNode.PRIMARY : ExecutionNode.ALL; |
| this.hashCode = new HashCodeBuilder(7, 67).append(identifier).toHashCode(); |
| |
| try { |
| if (processorDetails.getProcClass().isAnnotationPresent(DefaultSchedule.class)) { |
| DefaultSchedule dsc = processorDetails.getProcClass().getAnnotation(DefaultSchedule.class); |
| try { |
| this.setSchedulingStrategy(dsc.strategy()); |
| } catch (Throwable ex) { |
| LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex); |
| } |
| try { |
| this.setScheduldingPeriod(dsc.period()); |
| } catch (Throwable ex) { |
| this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN); |
| LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex); |
| } |
| if (!processorDetails.isTriggeredSerially()) { |
| try { |
| setMaxConcurrentTasks(dsc.concurrentTasks()); |
| } catch (Throwable ex) { |
| LOG.error(String.format("Error while setting max concurrent tasks from DefaultSchedule annotation: %s", ex.getMessage()), ex); |
| } |
| } |
| } |
| } catch (Throwable ex) { |
| LOG.error(String.format("Error while setting default schedule from DefaultSchedule annotation: %s",ex.getMessage()),ex); |
| } |
| } |
| |
| @Override |
| public ConfigurableComponent getComponent() { |
| return processorRef.get().getProcessor(); |
| } |
| |
| @Override |
| public TerminationAwareLogger getLogger() { |
| return processorRef.get().getComponentLog(); |
| } |
| |
| @Override |
| public Object getRunnableComponent() { |
| return getProcessor(); |
| } |
| |
| @Override |
| public BundleCoordinate getBundleCoordinate() { |
| return processorRef.get().getBundleCoordinate(); |
| } |
| |
| /** |
| * @return comments about this specific processor instance |
| */ |
| @Override |
| public String getComments() { |
| return comments.get(); |
| } |
| |
| @Override |
| public Authorizable getParentAuthorizable() { |
| return getProcessGroup(); |
| } |
| |
| @Override |
| public Resource getResource() { |
| return ResourceFactory.getComponentResource(ResourceType.Processor, getIdentifier(), getName()); |
| } |
| |
| @Override |
| public boolean isRestricted() { |
| return getProcessor().getClass().isAnnotationPresent(Restricted.class); |
| } |
| |
| @Override |
| public Class<?> getComponentClass() { |
| return getProcessor().getClass(); |
| } |
| |
| @Override |
| public boolean isDeprecated() { |
| return getProcessor().getClass().isAnnotationPresent(DeprecationNotice.class); |
| } |
| |
| |
| /** |
| * Provides and opportunity to retain information about this particular |
| * processor instance |
| * |
| * @param comments |
| * new comments |
| */ |
| @Override |
| public synchronized void setComments(final String comments) { |
| this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments)); |
| } |
| |
| @Override |
| public Position getPosition() { |
| return position.get(); |
| } |
| |
| @Override |
| public synchronized void setPosition(final Position position) { |
| this.position.set(position); |
| } |
| |
| @Override |
| public Map<String, String> getStyle() { |
| return style.get(); |
| } |
| |
| @Override |
| public synchronized void setStyle(final Map<String, String> style) { |
| if (style != null) { |
| this.style.set(Collections.unmodifiableMap(new HashMap<>(style))); |
| } |
| } |
| |
| @Override |
| public String getIdentifier() { |
| return identifier.get(); |
| } |
| |
| /** |
| * @return if true flow file content generated by this processor is |
| * considered loss tolerant |
| */ |
| @Override |
| public boolean isLossTolerant() { |
| return lossTolerant.get(); |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public boolean isIsolated() { |
| return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY; |
| } |
| |
| /** |
| * @return true if the processor has the {@link TriggerWhenEmpty} |
| * annotation, false otherwise. |
| */ |
| @Override |
| public boolean isTriggerWhenEmpty() { |
| return processorRef.get().isTriggerWhenEmpty(); |
| } |
| |
| /** |
| * @return true if the processor has the {@link SideEffectFree} annotation, |
| * false otherwise. |
| */ |
| @Override |
| public boolean isSideEffectFree() { |
| return processorRef.get().isSideEffectFree(); |
| } |
| |
| @Override |
| public boolean isSessionBatchingSupported() { |
| return processorRef.get().isBatchSupported(); |
| } |
| |
| /** |
| * @return true if the processor has the |
| * {@link TriggerWhenAnyDestinationAvailable} annotation, false |
| * otherwise. |
| */ |
| @Override |
| public boolean isTriggerWhenAnyDestinationAvailable() { |
| return processorRef.get().isTriggerWhenAnyDestinationAvailable(); |
| } |
| |
| /** |
| * Indicates whether the processor's executionNode configuration is restricted to run only in primary node |
| */ |
| @Override |
| public boolean isExecutionNodeRestricted(){ |
| return processorRef.get().isExecutionNodeRestricted(); |
| } |
| |
| /** |
| * Indicates whether flow file content made by this processor must be |
| * persisted |
| * |
| * @param lossTolerant |
| * tolerant |
| */ |
| @Override |
| public synchronized void setLossTolerant(final boolean lossTolerant) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| this.lossTolerant.set(lossTolerant); |
| } |
| |
| @Override |
| public boolean isAutoTerminated(final Relationship relationship) { |
| if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) { |
| return true; |
| } |
| final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get(); |
| return terminatable == null ? false : terminatable.contains(relationship); |
| } |
| |
| @Override |
| public void setAutoTerminatedRelationships(final Set<Relationship> terminate) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| for (final Relationship rel : terminate) { |
| if (!getConnections(rel).isEmpty()) { |
| throw new IllegalStateException("Cannot mark relationship '" + rel.getName() |
| + "' as auto-terminated because Connection already exists with this relationship"); |
| } |
| } |
| |
| undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); |
| LOG.debug("Resetting Validation State of {} due to setting auto-terminated relationships", this); |
| resetValidationState(); |
| } |
| |
| /** |
| * @return an unmodifiable Set that contains all of the |
| * ProcessorRelationship objects that are configured to be |
| * auto-terminated |
| */ |
| @Override |
| public Set<Relationship> getAutoTerminatedRelationships() { |
| Set<Relationship> relationships = undefinedRelationshipsToTerminate.get(); |
| if (relationships == null) { |
| relationships = new HashSet<>(); |
| } |
| return Collections.unmodifiableSet(relationships); |
| } |
| |
| /** |
| * @return the value of the processor's {@link CapabilityDescription} |
| * annotation, if one exists, else <code>null</code>. |
| */ |
| public String getProcessorDescription() { |
| final Processor processor = processorRef.get().getProcessor(); |
| final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); |
| String description = null; |
| if (capDesc != null) { |
| description = capDesc.value(); |
| } |
| return description; |
| } |
| |
| @Override |
| public synchronized void setName(final String name) { |
| super.setName(name); |
| } |
| |
| /** |
| * @param timeUnit determines the unit of time to represent the scheduling period. |
| * @return the schedule period that should elapse before subsequent cycles of this processor's tasks |
| */ |
| @Override |
| public long getSchedulingPeriod(final TimeUnit timeUnit) { |
| return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); |
| } |
| |
| @Override |
| public boolean isEventDrivenSupported() { |
| return processorRef.get().isEventDrivenSupported(); |
| } |
| |
| /** |
| * Updates the Scheduling Strategy used for this Processor |
| * |
| * @param schedulingStrategy |
| * strategy |
| * |
| * @throws IllegalArgumentException |
| * if the SchedulingStrategy is not not applicable for this |
| * Processor |
| */ |
| @Override |
| public synchronized void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { |
| if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !processorRef.get().isEventDrivenSupported()) { |
| // not valid. Just ignore it. We don't throw an Exception because if |
| // a developer changes a Processor so that |
| // it no longer supports EventDriven mode, we don't want the app to |
| // fail to startup if it was already in Event-Driven |
| // Mode. Instead, we will simply leave it in Timer-Driven mode |
| return; |
| } |
| |
| this.schedulingStrategy = schedulingStrategy; |
| } |
| |
| /** |
| * @return the currently configured scheduling strategy |
| */ |
| @Override |
| public SchedulingStrategy getSchedulingStrategy() { |
| return this.schedulingStrategy; |
| } |
| |
| @Override |
| public String getSchedulingPeriod() { |
| return schedulingPeriod.get(); |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public synchronized void setScheduldingPeriod(final String schedulingPeriod) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| switch (schedulingStrategy) { |
| case CRON_DRIVEN: { |
| try { |
| new CronExpression(schedulingPeriod); |
| } catch (final Exception e) { |
| throw new IllegalArgumentException( |
| "Scheduling Period is not a valid cron expression: " + schedulingPeriod); |
| } |
| } |
| break; |
| case PRIMARY_NODE_ONLY: |
| case TIMER_DRIVEN: { |
| final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), |
| TimeUnit.NANOSECONDS); |
| if (schedulingNanos < 0) { |
| throw new IllegalArgumentException("Scheduling Period must be positive"); |
| } |
| this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); |
| } |
| break; |
| case EVENT_DRIVEN: |
| default: |
| return; |
| } |
| |
| this.schedulingPeriod.set(schedulingPeriod); |
| } |
| |
| @Override |
| public synchronized void setExecutionNode(final ExecutionNode executionNode) { |
| if (this.isExecutionNodeRestricted()) { |
| this.executionNode = ExecutionNode.PRIMARY; |
| } else { |
| this.executionNode = executionNode; |
| } |
| } |
| |
| @Override |
| public ExecutionNode getExecutionNode() { |
| return this.executionNode; |
| } |
| |
| @Override |
| public long getRunDuration(final TimeUnit timeUnit) { |
| return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); |
| } |
| |
| @Override |
| public synchronized void setRunDuration(final long duration, final TimeUnit timeUnit) { |
| if (duration < 0) { |
| throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " |
| + timeUnit.toSeconds(duration) + " seconds"); |
| } |
| |
| this.runNanos = timeUnit.toNanos(duration); |
| } |
| |
| @Override |
| public long getYieldPeriod(final TimeUnit timeUnit) { |
| final TimeUnit unit = (timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); |
| return unit.convert(yieldNanos, TimeUnit.NANOSECONDS); |
| } |
| |
| @Override |
| public String getYieldPeriod() { |
| return yieldPeriod.get(); |
| } |
| |
| @Override |
| public synchronized void setYieldPeriod(final String yieldPeriod) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| final long yieldNanos = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.NANOSECONDS); |
| if (yieldNanos < 0) { |
| throw new IllegalArgumentException("Yield duration must be positive"); |
| } |
| this.yieldPeriod.set(yieldPeriod); |
| this.yieldNanos = yieldNanos; |
| } |
| |
| /** |
| * Causes the processor not to be scheduled for some period of time. This |
| * duration can be obtained and set via the |
| * {@link #getYieldPeriod(TimeUnit)} and |
| * {@link #setYieldPeriod(String)}. |
| */ |
| @Override |
| public void yield() { |
| final Processor processor = processorRef.get().getProcessor(); |
| final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); |
| yield(yieldMillis, TimeUnit.MILLISECONDS); |
| |
| final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" |
| : yieldMillis + " milliseconds"; |
| LoggerFactory.getLogger(processor.getClass()).debug( |
| "{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, |
| yieldDuration); |
| } |
| |
| @Override |
| public void yield(final long period, final TimeUnit timeUnit) { |
| final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit); |
| yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); |
| |
| processScheduler.yield(this); |
| } |
| |
| /** |
| * @return the number of milliseconds since Epoch at which time this |
| * processor is to once again be scheduled. |
| */ |
| @Override |
| public long getYieldExpiration() { |
| return yieldExpiration.get(); |
| } |
| |
| @Override |
| public long getPenalizationPeriod(final TimeUnit timeUnit) { |
| return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); |
| } |
| |
| @Override |
| public String getPenalizationPeriod() { |
| return penalizationPeriod.get(); |
| } |
| |
| @Override |
| public synchronized void setPenalizationPeriod(final String penalizationPeriod) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS); |
| if (penalizationMillis < 0) { |
| throw new IllegalArgumentException("Penalization duration must be positive"); |
| } |
| |
| this.penalizationPeriod.set(penalizationPeriod); |
| } |
| |
| /** |
| * Determines the number of concurrent tasks that may be running for this |
| * processor. |
| * |
| * @param taskCount |
| * a number of concurrent tasks this processor may have running |
| * @throws IllegalArgumentException |
| * if the given value is less than 1 |
| */ |
| @Override |
| public synchronized void setMaxConcurrentTasks(final int taskCount) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { |
| throw new IllegalArgumentException("Cannot set Concurrent Tasks to " + taskCount + " for component " |
| + getIdentifier() + " because Scheduling Strategy is not Event Driven"); |
| } |
| |
| if (!isTriggeredSerially()) { |
| concurrentTaskCount.set(taskCount); |
| } |
| } |
| |
| @Override |
| public boolean isTriggeredSerially() { |
| return processorRef.get().isTriggeredSerially(); |
| } |
| |
| /** |
| * @return the number of tasks that may execute concurrently for this processor |
| */ |
| @Override |
| public int getMaxConcurrentTasks() { |
| return concurrentTaskCount.get(); |
| } |
| |
| @Override |
| public LogLevel getBulletinLevel() { |
| return bulletinLevel; |
| } |
| |
| @Override |
| public synchronized void setBulletinLevel(final LogLevel level) { |
| LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level); |
| this.bulletinLevel = level; |
| } |
| |
| @Override |
| public Set<Connection> getConnections() { |
| final Set<Connection> allConnections = new HashSet<>(); |
| for (final Set<Connection> connectionSet : connections.values()) { |
| allConnections.addAll(connectionSet); |
| } |
| |
| return allConnections; |
| } |
| |
| @Override |
| public List<Connection> getIncomingConnections() { |
| return incomingConnections.get(); |
| } |
| |
| @Override |
| public Set<Connection> getConnections(final Relationship relationship) { |
| final Set<Connection> applicableConnections = connections.get(relationship); |
| return (applicableConnections == null) ? Collections.<Connection> emptySet() |
| : Collections.unmodifiableSet(applicableConnections); |
| } |
| |
| @Override |
| public void addConnection(final Connection connection) { |
| Objects.requireNonNull(connection, "connection cannot be null"); |
| |
| if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) { |
| throw new IllegalStateException( |
| "Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); |
| } |
| |
| try { |
| List<Connection> updatedIncoming = null; |
| if (connection.getDestination().equals(this)) { |
| // don't add the connection twice. This may occur if we have a |
| // self-loop because we will be told |
| // to add the connection once because we are the source and again |
| // because we are the destination. |
| final List<Connection> incomingConnections = getIncomingConnections(); |
| updatedIncoming = new ArrayList<>(incomingConnections); |
| if (!updatedIncoming.contains(connection)) { |
| updatedIncoming.add(connection); |
| } |
| } |
| |
| if (connection.getSource().equals(this)) { |
| // don't add the connection twice. This may occur if we have a |
| // self-loop because we will be told |
| // to add the connection once because we are the source and again |
| // because we are the destination. |
| if (!destinations.containsKey(connection)) { |
| for (final Relationship relationship : connection.getRelationships()) { |
| final Relationship rel = getRelationship(relationship.getName()); |
| Set<Connection> set = connections.get(rel); |
| if (set == null) { |
| set = new HashSet<>(); |
| connections.put(rel, set); |
| } |
| |
| set.add(connection); |
| |
| destinations.put(connection, connection.getDestination()); |
| } |
| |
| final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); |
| if (autoTerminated != null) { |
| autoTerminated.removeAll(connection.getRelationships()); |
| this.undefinedRelationshipsToTerminate.set(autoTerminated); |
| } |
| } |
| } |
| |
| if (updatedIncoming != null) { |
| setIncomingConnections(Collections.unmodifiableList(updatedIncoming)); |
| } |
| } finally { |
| LOG.debug("Resetting Validation State of {} due to connection added", this); |
| resetValidationState(); |
| } |
| } |
| |
| @Override |
| public boolean hasIncomingConnection() { |
| return !getIncomingConnections().isEmpty(); |
| } |
| |
| @Override |
| public void updateConnection(final Connection connection) throws IllegalStateException { |
| try { |
| if (requireNonNull(connection).getSource().equals(this)) { |
| // update any relationships |
| // |
| // first check if any relations were removed. |
| final List<Relationship> existingRelationships = new ArrayList<>(); |
| for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { |
| if (entry.getValue().contains(connection)) { |
| existingRelationships.add(entry.getKey()); |
| } |
| } |
| |
| for (final Relationship rel : connection.getRelationships()) { |
| if (!existingRelationships.contains(rel)) { |
| // relationship was removed. Check if this is legal. |
| final Set<Connection> connectionsForRelationship = getConnections(rel); |
| if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() |
| && !isAutoTerminated(rel) && getRelationships().contains(rel)) { |
| // if we are running and we do not terminate undefined |
| // relationships and this is the only |
| // connection that defines the given relationship, and |
| // that relationship is required, |
| // then it is not legal to remove this relationship from |
| // this connection. |
| throw new IllegalStateException("Cannot remove relationship " + rel.getName() |
| + " from Connection because doing so would invalidate Processor " + this |
| + ", which is currently running"); |
| } |
| } |
| } |
| |
| // remove the connection from any list that currently contains |
| for (final Set<Connection> list : connections.values()) { |
| list.remove(connection); |
| } |
| |
| // add the connection in for all relationships listed. |
| for (final Relationship rel : connection.getRelationships()) { |
| Set<Connection> set = connections.get(rel); |
| if (set == null) { |
| set = new HashSet<>(); |
| connections.put(rel, set); |
| } |
| set.add(connection); |
| } |
| |
| // update to the new destination |
| destinations.put(connection, connection.getDestination()); |
| |
| final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); |
| if (autoTerminated != null) { |
| autoTerminated.removeAll(connection.getRelationships()); |
| this.undefinedRelationshipsToTerminate.set(autoTerminated); |
| } |
| } |
| |
| if (connection.getDestination().equals(this)) { |
| // update our incoming connections -- we can just remove & re-add |
| // the connection to update the list. |
| final List<Connection> incomingConnections = getIncomingConnections(); |
| final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); |
| updatedIncoming.remove(connection); |
| updatedIncoming.add(connection); |
| setIncomingConnections(Collections.unmodifiableList(updatedIncoming)); |
| } |
| } finally { |
| // need to perform validation in case selected relationships were changed. |
| LOG.debug("Resetting Validation State of {} due to updating connection", this); |
| resetValidationState(); |
| } |
| } |
| |
| @Override |
| public void removeConnection(final Connection connection) { |
| boolean connectionRemoved = false; |
| |
| if (requireNonNull(connection).getSource().equals(this)) { |
| for (final Relationship relationship : connection.getRelationships()) { |
| final Set<Connection> connectionsForRelationship = getConnections(relationship); |
| if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) { |
| throw new IllegalStateException( |
| "This connection cannot be removed because its source is running and removing it will invalidate this processor"); |
| } |
| } |
| |
| for (final Set<Connection> connectionList : this.connections.values()) { |
| connectionList.remove(connection); |
| } |
| |
| connectionRemoved = (destinations.remove(connection) != null); |
| } |
| |
| if (connection.getDestination().equals(this)) { |
| final List<Connection> incomingConnections = getIncomingConnections(); |
| if (incomingConnections.contains(connection)) { |
| final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); |
| updatedIncoming.remove(connection); |
| setIncomingConnections(Collections.unmodifiableList(updatedIncoming)); |
| return; |
| } |
| } |
| |
| if (!connectionRemoved) { |
| throw new IllegalArgumentException( |
| "Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); |
| } |
| |
| LOG.debug("Resetting Validation State of {} due to connection removed", this); |
| resetValidationState(); |
| } |
| |
| private void setIncomingConnections(final List<Connection> incoming) { |
| this.incomingConnections.set(incoming); |
| LOG.debug("Resetting Validation State of {} due to setting incoming connections", this); |
| resetValidationState(); |
| } |
| |
| /** |
| * @param relationshipName |
| * name |
| * @return the relationship for this nodes processor for the given name or |
| * creates a new relationship for the given name |
| */ |
| @Override |
| public Relationship getRelationship(final String relationshipName) { |
| final Relationship specRel = new Relationship.Builder().name(relationshipName).build(); |
| Relationship returnRel = specRel; |
| |
| final Set<Relationship> relationships; |
| final Processor processor = processorRef.get().getProcessor(); |
| try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| relationships = processor.getRelationships(); |
| } |
| |
| for (final Relationship rel : relationships) { |
| if (rel.equals(specRel)) { |
| returnRel = rel; |
| break; |
| } |
| } |
| return returnRel; |
| } |
| |
| @Override |
| public Processor getProcessor() { |
| return processorRef.get().getProcessor(); |
| } |
| |
| @Override |
| public synchronized void setProcessor(final LoggableComponent<Processor> processor) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| |
| final ProcessorDetails processorDetails = new ProcessorDetails(processor); |
| processorRef.set(processorDetails); |
| } |
| |
| @Override |
| public synchronized void reload(final Set<URL> additionalUrls) throws ProcessorInstantiationException { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot reload Processor while the Processor is running"); |
| } |
| String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls); |
| setAdditionalResourcesFingerprint(additionalResourcesFingerprint); |
| getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls); |
| } |
| |
| /** |
| * @return the Set of destination processors for all relationships excluding |
| * any destinations that are this processor itself (self-loops) |
| */ |
| public Set<Connectable> getDestinations() { |
| final Set<Connectable> nonSelfDestinations = new HashSet<>(); |
| for (final Connectable connectable : destinations.values()) { |
| if (connectable != this) { |
| nonSelfDestinations.add(connectable); |
| } |
| } |
| return nonSelfDestinations; |
| } |
| |
| public Set<Connectable> getDestinations(final Relationship relationship) { |
| final Set<Connectable> destinationSet = new HashSet<>(); |
| final Set<Connection> relationshipConnections = connections.get(relationship); |
| if (relationshipConnections != null) { |
| for (final Connection connection : relationshipConnections) { |
| destinationSet.add(destinations.get(connection)); |
| } |
| } |
| return destinationSet; |
| } |
| |
| public Set<Relationship> getUndefinedRelationships() { |
| final Set<Relationship> undefined = new HashSet<>(); |
| final Set<Relationship> relationships; |
| final Processor processor = processorRef.get().getProcessor(); |
| try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| relationships = processor.getRelationships(); |
| } |
| |
| if (relationships == null) { |
| return undefined; |
| } |
| for (final Relationship relation : relationships) { |
| final Set<Connection> connectionSet = this.connections.get(relation); |
| if (connectionSet == null || connectionSet.isEmpty()) { |
| undefined.add(relation); |
| } |
| } |
| return undefined; |
| } |
| |
| /** |
| * Determines if the given node is a destination for this node |
| * |
| * @param node |
| * node |
| * @return true if is a direct destination node; false otherwise |
| */ |
| boolean isRelated(final ProcessorNode node) { |
| return this.destinations.containsValue(node); |
| } |
| |
| @Override |
| public boolean isRunning() { |
| return getScheduledState().equals(ScheduledState.RUNNING) || hasActiveThreads; |
| } |
| |
| @Override |
| public boolean isValidationNecessary() { |
| switch (getPhysicalScheduledState()) { |
| case STOPPED: |
| case STOPPING: |
| case STARTING: |
| return true; |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public int getActiveThreadCount() { |
| final int activeThreadCount = processScheduler.getActiveThreadCount(this); |
| |
| // When getScheduledState() is called, we map the 'physical' state of STOPPING to STOPPED. This is done in order to maintain |
| // backward compatibility because the UI and other clients will not know of the (relatively newer) 'STOPPING' state. |
| // Because of there previously was no STOPPING state, the way to determine of a processor had truly stopped was to check if its |
| // Scheduled State was STOPPED AND it had no active threads. |
| // |
| // Also, we can have a situation in which a processor is started while invalid. Before the processor becomes valid, it can be stopped. |
| // In this situation, the processor state will become STOPPING until the background thread checks the state, calls any necessary lifecycle methods, |
| // and finally updates the state to STOPPED. In the interim, we have a situation where a call to getScheduledState() returns STOPPED and there are no |
| // active threads, which the client will interpret as the processor being fully stopped. However, in this situation, an attempt to update the processor, etc. |
| // will fail because the processor is not truly fully stopped. |
| // |
| // To prevent this situation, we return 1 for the number of active tasks when the processor is considered STOPPING. In doing this, we ensure that the condition |
| // of (getScheduledState() == STOPPED and activeThreads == 0) never happens while the processor is still stopping. |
| // |
| // This probably is calling for a significant refactoring / rethinking of this class. It would make sense, for example, to extract some of the logic into a separate |
| // StateTransition class as we've done with Controller Services. That would at least more cleanly encapsulate this logic. However, this is a simple enough work around for the time being. |
| if (activeThreadCount == 0 && getPhysicalScheduledState() == ScheduledState.STOPPING) { |
| return 1; |
| } |
| |
| return activeThreadCount; |
| } |
| |
| List<Connection> getIncomingNonLoopConnections() { |
| final List<Connection> connections = getIncomingConnections(); |
| final List<Connection> nonLoopConnections = new ArrayList<>(connections.size()); |
| for (final Connection connection : connections) { |
| if (!connection.getSource().equals(this)) { |
| nonLoopConnections.add(connection); |
| } |
| } |
| |
| return nonLoopConnections; |
| } |
| |
| |
| @Override |
| public Collection<ValidationResult> getValidationErrors() { |
| final ValidationState validationState = getValidationState(); |
| return validationState.getValidationErrors(); |
| } |
| |
| @Override |
| protected Collection<ValidationResult> computeValidationErrors(final ValidationContext validationContext) { |
| final List<ValidationResult> results = new ArrayList<>(); |
| try { |
| final Collection<ValidationResult> validationResults = super.computeValidationErrors(validationContext); |
| |
| validationResults.stream() |
| .filter(result -> !result.isValid()) |
| .forEach(results::add); |
| |
| // Ensure that any relationships that don't have a connection defined are auto-terminated |
| for (final Relationship relationship : getUndefinedRelationships()) { |
| if (!isAutoTerminated(relationship)) { |
| final ValidationResult error = new ValidationResult.Builder() |
| .explanation("Relationship '" + relationship.getName() |
| + "' is not connected to any component and is not auto-terminated") |
| .subject("Relationship " + relationship.getName()).valid(false).build(); |
| results.add(error); |
| } |
| } |
| |
| // Ensure that the requirements of the InputRequirement are met. |
| switch (getInputRequirement()) { |
| case INPUT_ALLOWED: |
| break; |
| case INPUT_FORBIDDEN: { |
| final int incomingConnCount = getIncomingNonLoopConnections().size(); |
| if (incomingConnCount != 0) { |
| results.add(new ValidationResult.Builder().explanation( |
| "Processor does not allow upstream connections but currently has " + incomingConnCount) |
| .subject("Upstream Connections").valid(false).build()); |
| } |
| break; |
| } |
| case INPUT_REQUIRED: { |
| if (getIncomingNonLoopConnections().isEmpty()) { |
| results.add(new ValidationResult.Builder() |
| .explanation("Processor requires an upstream connection but currently has none") |
| .subject("Upstream Connections").valid(false).build()); |
| } |
| break; |
| } |
| } |
| |
| // Ensure that execution node will not be misused |
| if (getExecutionNode() == ExecutionNode.PRIMARY && hasIncomingConnection()) { |
| results.add(new ValidationResult.Builder() |
| .explanation("Processors with incoming connections cannot be scheduled for Primary Node Only.") |
| .subject("Execution Node").valid(false).build()); |
| } |
| } catch (final Throwable t) { |
| LOG.error("Failed to perform validation", t); |
| results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()) |
| .valid(false).build()); |
| } |
| |
| return results; |
| } |
| |
| @Override |
| public Requirement getInputRequirement() { |
| return processorRef.get().getInputRequirement(); |
| } |
| |
| /** |
| * Establishes node equality (based on the processor's identifier) |
| * |
| * @param other |
| * node |
| * @return true if equal |
| */ |
| @Override |
| public boolean equals(final Object other) { |
| if (!(other instanceof ProcessorNode)) { |
| return false; |
| } |
| final ProcessorNode on = (ProcessorNode) other; |
| return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return hashCode; |
| } |
| |
| @Override |
| public Collection<Relationship> getRelationships() { |
| final Processor processor = processorRef.get().getProcessor(); |
| try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| return getProcessor().getRelationships(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| final Processor processor = processorRef.get().getProcessor(); |
| try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| return getProcessor().toString(); |
| } |
| } |
| |
| @Override |
| public ProcessGroup getProcessGroup() { |
| return processGroup.get(); |
| } |
| |
| @Override |
| protected ParameterContext getParameterContext() { |
| final ProcessGroup processGroup = getProcessGroup(); |
| return processGroup == null ? null : processGroup.getParameterContext(); |
| } |
| |
| @Override |
| public ParameterLookup getParameterLookup() { |
| return getParameterContext(); |
| } |
| |
| @Override |
| public synchronized void setProcessGroup(final ProcessGroup group) { |
| this.processGroup.set(group); |
| LOG.debug("Resetting Validation State of {} due to setting process group", this); |
| resetValidationState(); |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { |
| final Processor processor = processorRef.get().getProcessor(); |
| |
| activateThread(); |
| try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| processor.onTrigger(context, sessionFactory); |
| } finally { |
| deactivateThread(); |
| } |
| } |
| |
| @Override |
| public ConnectableType getConnectableType() { |
| return ConnectableType.PROCESSOR; |
| } |
| |
| @Override |
| public void setAnnotationData(final String data) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot set AnnotationData while processor is running"); |
| } |
| super.setAnnotationData(data); |
| } |
| |
| @Override |
| public void verifyCanDelete() throws IllegalStateException { |
| verifyCanDelete(false); |
| } |
| |
| @Override |
| public void verifyCanDelete(final boolean ignoreConnections) { |
| if (isRunning()) { |
| throw new IllegalStateException(this.getIdentifier() + " is running"); |
| } |
| |
| if (!ignoreConnections) { |
| for (final Set<Connection> connectionSet : connections.values()) { |
| for (final Connection connection : connectionSet) { |
| connection.verifyCanDelete(); |
| } |
| } |
| |
| for (final Connection connection : getIncomingConnections()) { |
| if (connection.getSource().equals(this)) { |
| connection.verifyCanDelete(); |
| } else { |
| throw new IllegalStateException(this.getIdentifier() + " is the destination of another component"); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void verifyCanStart() { |
| this.verifyCanStart(null); |
| } |
| |
| @Override |
| public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { |
| final ScheduledState currentState = getPhysicalScheduledState(); |
| if (currentState != ScheduledState.STOPPED && currentState != ScheduledState.DISABLED) { |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not stopped. Current state is " + currentState.name()); |
| } |
| |
| verifyNoActiveThreads(); |
| |
| switch (getValidationStatus()) { |
| case VALID: |
| return; |
| case VALIDATING: |
| throw new IllegalStateException("Processor with ID " + getIdentifier() + " cannot be started because its validation is still being performed"); |
| } |
| |
| final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences); |
| if (ignoredReferences != null && !validationErrors.isEmpty()) { |
| throw new IllegalStateException("Processor with ID " + getIdentifier() + " cannot be started because it is not currently valid"); |
| } |
| } |
| |
| @Override |
| public void verifyCanStop() { |
| if (getScheduledState() != ScheduledState.RUNNING) { |
| throw new IllegalStateException(this.getIdentifier() + " is not scheduled to run"); |
| } |
| } |
| |
| @Override |
| public void verifyCanUpdate() { |
| if (isRunning()) { |
| throw new IllegalStateException(this.getIdentifier() + " is not stopped"); |
| } |
| } |
| |
| @Override |
| public void verifyCanEnable() { |
| if (getScheduledState() != ScheduledState.DISABLED) { |
| throw new IllegalStateException(this.getIdentifier() + " is not disabled"); |
| } |
| |
| verifyNoActiveThreads(); |
| } |
| |
| @Override |
| public void verifyCanDisable() { |
| if (getScheduledState() != ScheduledState.STOPPED) { |
| throw new IllegalStateException(this.getIdentifier() + " is not stopped"); |
| } |
| verifyNoActiveThreads(); |
| } |
| |
| @Override |
| public void verifyCanClearState() throws IllegalStateException { |
| verifyCanUpdate(); |
| } |
| |
| private void verifyNoActiveThreads() throws IllegalStateException { |
| if (hasActiveThreads) { |
| final int threadCount = getActiveThreadCount(); |
| if (threadCount > 0) { |
| throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active"); |
| } |
| } |
| } |
| |
| @Override |
| public void verifyModifiable() throws IllegalStateException { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); |
| } |
| } |
| |
| @Override |
| public void enable() { |
| desiredState = ScheduledState.STOPPED; |
| final boolean updated = scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED); |
| |
| if (updated) { |
| LOG.info("{} enabled so ScheduledState transitioned from DISABLED to STOPPED", this); |
| } else { |
| LOG.info("{} enabled but not currently DISABLED so set desired state to STOPPED; current state is {}", this, scheduledState.get()); |
| } |
| } |
| |
| @Override |
| public void disable() { |
| desiredState = ScheduledState.DISABLED; |
| final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); |
| |
| if (updated) { |
| LOG.info("{} disabled so ScheduledState transitioned from STOPPED to DISABLED", this); |
| } else { |
| LOG.info("{} disabled but not currently STOPPED so set desired state to DISABLED; current state is {}", this, scheduledState.get()); |
| } |
| } |
| |
| |
| /** |
| * Will idempotently start the processor using the following sequence: <i> |
| * <ul> |
| * <li>Validate Processor's state (e.g., PropertyDescriptors, |
| * ControllerServices etc.)</li> |
| * <li>Transition (atomically) Processor's scheduled state form STOPPED to |
| * STARTING. If the above state transition succeeds, then execute the start |
| * task (asynchronously) which will be re-tried until @OnScheduled is |
| * executed successfully and "schedulingAgentCallback' is invoked, or until |
| * STOP operation is initiated on this processor. If state transition fails |
| * it means processor is already being started and WARN message will be |
| * logged explaining it.</li> |
| * </ul> |
| * </i> |
| * <p> |
| * Any exception thrown while invoking operations annotated with @OnSchedule |
| * will be caught and logged after which @OnUnscheduled operation will be |
| * invoked (quietly) and the start sequence will be repeated (re-try) after |
| * delay provided by 'administrativeYieldMillis'. |
| * </p> |
| * <p> |
| * Upon successful completion of start sequence (@OnScheduled -> |
| * 'schedulingAgentCallback') the attempt will be made to transition |
| * processor's scheduling state to RUNNING at which point processor is |
| * considered to be fully started and functioning. If upon successful |
| * invocation of @OnScheduled operation the processor can not be |
| * transitioned to RUNNING state (e.g., STOP operation was invoked on the |
| * processor while it's @OnScheduled operation was executing), the |
| * processor's @OnUnscheduled operation will be invoked and its scheduling |
| * state will be set to STOPPED at which point the processor is considered |
| * to be fully stopped. |
| * </p> |
| */ |
| @Override |
| public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier<ProcessContext> processContextFactory, |
| final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) { |
| |
| run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, failIfStopping, ScheduledState.RUNNING, ScheduledState.STARTING); |
| } |
| |
| /** |
| * Similar to {@link #start(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback, boolean)}, except for the following: |
| * <ul> |
| * <li> |
| * Once the {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method has been invoked successfully, the processor is scehduled to be stopped immediately. |
| * All appropriate lifecycle methods will be executed as well. |
| * </li> |
| * <li> |
| * The processor's desired state is going to be set to STOPPED right away. This usually doesn't prevent the processor to run once, unless NiFi is restarted before it can finish. |
| * In that case the processor will stay STOPPED after the restart. |
| * </li> |
| * </ul> |
| */ |
| @Override |
| public void runOnce(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier<ProcessContext> processContextFactory, |
| final SchedulingAgentCallback schedulingAgentCallback) { |
| |
| run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, true, ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE); |
| } |
| |
| private void run(ScheduledExecutorService taskScheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory, |
| SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredState, ScheduledState scheduledState) { |
| |
| final Processor processor = processorRef.get().getProcessor(); |
| final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); |
| LOG.info("Starting {}", this); |
| |
| ScheduledState currentState; |
| boolean starting; |
| synchronized (this) { |
| currentState = this.scheduledState.get(); |
| |
| if (currentState == ScheduledState.STOPPED) { |
| starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState); |
| if (starting) { |
| this.desiredState = desiredState; |
| } |
| } else if (currentState == ScheduledState.STOPPING && !failIfStopping) { |
| this.desiredState = desiredState; |
| return; |
| } else { |
| starting = false; |
| } |
| } |
| |
| if (starting) { // will ensure that the Processor represented by this node can only be started once |
| initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback); |
| } else { |
| final String procName = processorRef.get().toString(); |
| LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState); |
| procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[]{procName, currentState}); |
| } |
| } |
| |
| private synchronized void activateThread() { |
| final Thread thread = Thread.currentThread(); |
| final Long timestamp = System.currentTimeMillis(); |
| activeThreads.put(thread, new ActiveTask(timestamp)); |
| } |
| |
| private synchronized void deactivateThread() { |
| activeThreads.remove(Thread.currentThread()); |
| } |
| |
| @Override |
| public synchronized List<ActiveThreadInfo> getActiveThreads(final ThreadDetails threadDetails) { |
| final long now = System.currentTimeMillis(); |
| |
| final Map<Long, ThreadInfo> threadInfoMap = Stream.of(threadDetails.getThreadInfos()) |
| .collect(Collectors.toMap(ThreadInfo::getThreadId, Function.identity(), (a, b) -> a)); |
| |
| final List<ActiveThreadInfo> threadList = new ArrayList<>(activeThreads.size()); |
| for (final Map.Entry<Thread, ActiveTask> entry : activeThreads.entrySet()) { |
| final Thread thread = entry.getKey(); |
| final ActiveTask activeTask = entry.getValue(); |
| final Long timestamp = activeTask.getStartTime(); |
| final long activeMillis = now - timestamp; |
| final ThreadInfo threadInfo = threadInfoMap.get(thread.getId()); |
| |
| final String stackTrace = ThreadUtils.createStackTrace(thread, threadInfo, threadDetails.getDeadlockedThreadIds(), threadDetails.getMonitorDeadlockThreadIds(), activeMillis); |
| |
| final ActiveThreadInfo activeThreadInfo = new ActiveThreadInfo(thread.getName(), stackTrace, activeMillis, activeTask.isTerminated()); |
| threadList.add(activeThreadInfo); |
| } |
| |
| return threadList; |
| } |
| |
| @Override |
| public synchronized int getTerminatedThreadCount() { |
| int count = 0; |
| for (final ActiveTask task : activeThreads.values()) { |
| if (task.isTerminated()) { |
| count++; |
| } |
| } |
| |
| return count; |
| } |
| |
| |
| @Override |
| public int terminate() { |
| verifyCanTerminate(); |
| |
| int count = 0; |
| for (final Map.Entry<Thread, ActiveTask> entry : activeThreads.entrySet()) { |
| final Thread thread = entry.getKey(); |
| final ActiveTask activeTask = entry.getValue(); |
| |
| if (!activeTask.isTerminated()) { |
| activeTask.terminate(); |
| |
| thread.setName(thread.getName() + " <Terminated Task>"); |
| count++; |
| } |
| |
| thread.interrupt(); |
| } |
| |
| getLogger().terminate(); |
| scheduledState.set(ScheduledState.STOPPED); |
| hasActiveThreads = false; |
| |
| return count; |
| } |
| |
| @Override |
| public boolean isTerminated(final Thread thread) { |
| final ActiveTask activeTask = activeThreads.get(thread); |
| if (activeTask == null) { |
| return false; |
| } |
| |
| return activeTask.isTerminated(); |
| } |
| |
| @Override |
| public void verifyCanTerminate() { |
| if (getScheduledState() != ScheduledState.STOPPED && getScheduledState() != ScheduledState.RUN_ONCE) { |
| throw new IllegalStateException("Processor is not stopped"); |
| } |
| } |
| |
| |
| private void initiateStart(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMilis, |
| final Supplier<ProcessContext> processContextFactory, final SchedulingAgentCallback schedulingAgentCallback) { |
| |
| final Processor processor = getProcessor(); |
| final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); |
| |
| // Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run. |
| final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE); |
| |
| // Create a task to invoke the @OnScheduled annotation of the processor |
| final Callable<Void> startupTask = () -> { |
| final ProcessContext processContext = processContextFactory.get(); |
| |
| final ScheduledState currentScheduleState = scheduledState.get(); |
| if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED || getDesiredState() == ScheduledState.STOPPED) { |
| LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this); |
| schedulingAgentCallback.onTaskComplete(); |
| scheduledState.set(ScheduledState.STOPPED); |
| return null; |
| } |
| |
| final ValidationStatus validationStatus = getValidationStatus(); |
| if (validationStatus != ValidationStatus.VALID) { |
| LOG.debug("Cannot start {} because Processor is currently not valid; will try again after 5 seconds", StandardProcessorNode.this); |
| |
| // re-initiate the entire process |
| final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback); |
| taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS); |
| |
| schedulingAgentCallback.onTaskComplete(); |
| return null; |
| } |
| |
| LOG.debug("Invoking @OnScheduled methods of {}", processor); |
| |
| // Now that the task has been scheduled, set the timeout |
| completionTimestampRef.set(System.currentTimeMillis() + timeoutMilis); |
| |
| try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| try { |
| hasActiveThreads = true; |
| |
| activateThread(); |
| try { |
| ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); |
| } finally { |
| deactivateThread(); |
| } |
| |
| if ( |
| (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) |
| || (desiredState == ScheduledState.RUN_ONCE && scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE)) |
| ) { |
| LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor); |
| schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle |
| } else { |
| LOG.info("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now; current state = {}, desired state = {}", |
| processor, scheduledState.get(), desiredState); |
| |
| // can only happen if stopProcessor was called before service was transitioned to RUNNING state |
| activateThread(); |
| try { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); |
| hasActiveThreads = false; |
| } finally { |
| deactivateThread(); |
| } |
| |
| scheduledState.set(ScheduledState.STOPPED); |
| |
| if (desiredState == ScheduledState.DISABLED) { |
| final boolean disabled = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); |
| if (disabled) { |
| LOG.info("After stopping {}, determined that Desired State is DISABLED so disabled processor", processor); |
| } |
| } |
| } |
| } finally { |
| schedulingAgentCallback.onTaskComplete(); |
| } |
| } catch (Exception e) { |
| final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; |
| procLog.error("Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to " |
| + "initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to " + cause, cause); |
| |
| // If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run). |
| try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| activateThread(); |
| try { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); |
| hasActiveThreads = false; |
| } finally { |
| deactivateThread(); |
| } |
| } |
| |
| // make sure we only continue retry loop if STOP action wasn't initiated |
| if (scheduledState.get() != ScheduledState.STOPPING && scheduledState.get() != ScheduledState.RUN_ONCE) { |
| // re-initiate the entire process |
| final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback); |
| taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS); |
| } else { |
| scheduledState.set(ScheduledState.STOPPED); |
| } |
| } |
| |
| return null; |
| }; |
| |
| // Trigger the task in a background thread. |
| final Future<?> taskFuture = schedulingAgentCallback.scheduleTask(startupTask); |
| |
| // Trigger a task periodically to check if @OnScheduled task completed. Once it has, |
| // this task will call SchedulingAgentCallback#onTaskComplete. |
| // However, if the task times out, we need to be able to cancel the monitoring. So, in order |
| // to do this, we use #scheduleWithFixedDelay and then make that Future available to the task |
| // itself by placing it into an AtomicReference. |
| final AtomicReference<Future<?>> futureRef = new AtomicReference<>(); |
| final Runnable monitoringTask = new Runnable() { |
| @Override |
| public void run() { |
| Future<?> monitoringFuture = futureRef.get(); |
| if (monitoringFuture == null) { // Future is not yet available. Just return and wait for the next invocation. |
| return; |
| } |
| |
| monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); |
| } |
| }; |
| |
| final Future<?> future = taskScheduler.scheduleWithFixedDelay(monitoringTask, 1, 10, TimeUnit.MILLISECONDS); |
| futureRef.set(future); |
| } |
| |
| /** |
| * Will idempotently stop the processor using the following sequence: <i> |
| * <ul> |
| * <li>Transition (atomically) Processor's scheduled state from RUNNING to |
| * STOPPING. If the above state transition succeeds, then invoke any method |
| * on the Processor with the {@link OnUnscheduled} annotation. Once those methods |
| * have been called and returned (either normally or exceptionally), start checking |
| * to see if all of the Processor's active threads have finished. If not, check again |
| * every 100 milliseconds until they have. |
| * Once all after threads have completed, the processor's @OnStopped operation will be invoked |
| * and its scheduled state is set to STOPPED which completes processor stop |
| * sequence.</li> |
| * </ul> |
| * </i> |
| * |
| * <p> |
| * If for some reason processor's scheduled state can not be transitioned to |
| * STOPPING (e.g., the processor didn't finish @OnScheduled operation when |
| * stop was called), the attempt will be made to transition processor's |
| * scheduled state from STARTING to STOPPING which will allow |
| * {@link #start(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback, boolean)} |
| * method to initiate processor's shutdown upon exiting @OnScheduled |
| * operation, otherwise the processor's scheduled state will remain |
| * unchanged ensuring that multiple calls to this method are idempotent. |
| * </p> |
| */ |
| @Override |
| public CompletableFuture<Void> stop(final ProcessScheduler processScheduler, final ScheduledExecutorService executor, final ProcessContext processContext, |
| final SchedulingAgent schedulingAgent, final LifecycleState scheduleState) { |
| |
| final Processor processor = processorRef.get().getProcessor(); |
| LOG.info("Stopping processor: " + this); |
| desiredState = ScheduledState.STOPPED; |
| |
| final CompletableFuture<Void> future = new CompletableFuture<>(); |
| // will ensure that the Processor represented by this node can only be stopped once |
| if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING) || this.scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.STOPPING)) { |
| scheduleState.incrementActiveThreadCount(null); |
| |
| // will continue to monitor active threads, invoking OnStopped once there are no |
| // active threads (with the exception of the thread performing shutdown operations) |
| executor.execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| if (scheduleState.isScheduled()) { |
| schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState); |
| |
| activateThread(); |
| try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); |
| } finally { |
| deactivateThread(); |
| } |
| } |
| |
| // all threads are complete if the active thread count is 1. This is because this thread that is |
| // performing the lifecycle actions counts as 1 thread. |
| final boolean allThreadsComplete = scheduleState.getActiveThreadCount() == 1; |
| if (allThreadsComplete) { |
| activateThread(); |
| try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); |
| } finally { |
| deactivateThread(); |
| } |
| |
| scheduleState.decrementActiveThreadCount(null); |
| hasActiveThreads = false; |
| scheduledState.set(ScheduledState.STOPPED); |
| future.complete(null); |
| |
| // This can happen only when we join a cluster. In such a case, we can inherit a flow from the cluster that says that |
| // the Processor is to be running. However, if the Processor is already in the process of stopping, we cannot immediately |
| // start running the Processor. As a result, we check here, since the Processor is stopped, and then immediately start the |
| // Processor if need be. |
| final ScheduledState desired = StandardProcessorNode.this.desiredState; |
| if (desired == ScheduledState.RUNNING) { |
| LOG.info("Finished stopping {} but desired state is now RUNNING so will start processor", this); |
| processScheduler.startProcessor(StandardProcessorNode.this, true); |
| } else if (desired == ScheduledState.DISABLED) { |
| final boolean updated = scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED); |
| |
| if (updated) { |
| LOG.info("Finished stopping {} but desired state is now DISABLED so disabled processor", this); |
| } else { |
| LOG.info("Finished stopping {} but desired state is now DISABLED. Scheduled State could not be transitioned from STOPPED to DISABLED, " |
| + "though, so will allow the other thread to finish state transition. Current state is {}", this, scheduledState.get()); |
| } |
| } |
| } else { |
| // Not all of the active threads have finished. Try again in 100 milliseconds. |
| executor.schedule(this, 100, TimeUnit.MILLISECONDS); |
| } |
| } catch (final Exception e) { |
| LOG.warn("Failed while shutting down processor " + processor, e); |
| } |
| } |
| }); |
| } else { |
| // We do compareAndSet() instead of set() to ensure that Processor |
| // stoppage is handled consistently including a condition where |
| // Processor never got a chance to transition to RUNNING state |
| // before stop() was called. If that happens the stop processor |
| // routine will be initiated in start() method, otherwise the IF |
| // part will handle the stop processor routine. |
| final boolean updated = this.scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.STOPPING); |
| if (updated) { |
| LOG.debug("Transitioned state of {} from STARTING to STOPPING", this); |
| } |
| |
| future.complete(null); |
| } |
| |
| return future; |
| } |
| |
| @Override |
| public ScheduledState getDesiredState() { |
| return desiredState; |
| } |
| |
| private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> monitoringFuture, final long completionTimestamp) { |
| if (taskFuture.isDone()) { |
| monitoringFuture.cancel(false); // stop scheduling this task |
| } else if (System.currentTimeMillis() > completionTimestamp) { |
| // Task timed out. Request an interrupt of the processor task |
| taskFuture.cancel(true); |
| |
| // Stop monitoring the processor. We have interrupted the thread so that's all we can do. If the processor responds to the interrupt, then |
| // it will be re-scheduled. If it does not, then it will either keep the thread indefinitely or eventually finish, at which point |
| // the Processor will begin running. |
| monitoringFuture.cancel(false); |
| |
| final Processor processor = processorRef.get().getProcessor(); |
| LOG.warn("Timed out while waiting for OnScheduled of " |
| + processor + " to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " |
| + "guarantee that the task will be canceled since the code inside current OnScheduled operation may " |
| + "have been written to ignore interrupts which may result in a runaway thread. This could lead to more issues, " |
| + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" |
| + processor + "' that needs to be documented, reported and eventually fixed."); |
| } |
| } |
| |
| @Override |
| public String getProcessGroupIdentifier() { |
| final ProcessGroup group = getProcessGroup(); |
| return group == null ? null : group.getIdentifier(); |
| } |
| |
| @Override |
| public Optional<String> getVersionedComponentId() { |
| return Optional.ofNullable(versionedComponentId.get()); |
| } |
| |
| @Override |
| public void setVersionedComponentId(final String versionedComponentId) { |
| boolean updated = false; |
| while (!updated) { |
| final String currentId = this.versionedComponentId.get(); |
| |
| if (currentId == null) { |
| updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); |
| } else if (currentId.equals(versionedComponentId)) { |
| return; |
| } else if (versionedComponentId == null) { |
| updated = this.versionedComponentId.compareAndSet(currentId, null); |
| } else { |
| throw new IllegalStateException(this + " is already under version control"); |
| } |
| } |
| } |
| } |