| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller.scheduling; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| import java.lang.reflect.InvocationTargetException; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.annotation.lifecycle.OnUnscheduled; |
| import org.apache.nifi.components.state.StateManager; |
| import org.apache.nifi.components.state.StateManagerProvider; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.Funnel; |
| import org.apache.nifi.connectable.Port; |
| import org.apache.nifi.controller.AbstractPort; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.ProcessorNode; |
| import org.apache.nifi.controller.ReportingTaskNode; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.SchedulingAgentCallback; |
| import org.apache.nifi.controller.StandardProcessorNode; |
| import org.apache.nifi.controller.service.ControllerServiceNode; |
| import org.apache.nifi.controller.service.ControllerServiceProvider; |
| import org.apache.nifi.encrypt.StringEncryptor; |
| import org.apache.nifi.engine.FlowEngine; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.nar.NarCloseable; |
| import org.apache.nifi.processor.Processor; |
| import org.apache.nifi.processor.SimpleProcessLogger; |
| import org.apache.nifi.processor.StandardProcessContext; |
| import org.apache.nifi.registry.VariableRegistry; |
| import org.apache.nifi.reporting.ReportingTask; |
| import org.apache.nifi.scheduling.SchedulingStrategy; |
| import org.apache.nifi.util.FormatUtils; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.apache.nifi.util.ReflectionUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Responsible for scheduling Processors, Ports, and Funnels to run at regular intervals |
| */ |
| public final class StandardProcessScheduler implements ProcessScheduler { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class); |
| |
| private final ControllerServiceProvider controllerServiceProvider; |
| private final long administrativeYieldMillis; |
| private final String administrativeYieldDuration; |
| private final StateManagerProvider stateManagerProvider; |
| |
| private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); |
| private final ScheduledExecutorService frameworkTaskExecutor; |
| private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>(); |
| // thread pool for starting/stopping components |
| |
| private final ScheduledExecutorService componentLifeCycleThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); |
| private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); |
| |
| private final StringEncryptor encryptor; |
| private final VariableRegistry variableRegistry; |
| |
| public StandardProcessScheduler(final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, |
| final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry) { |
| this.controllerServiceProvider = controllerServiceProvider; |
| this.encryptor = encryptor; |
| this.stateManagerProvider = stateManagerProvider; |
| this.variableRegistry = variableRegistry; |
| |
| administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration(); |
| administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS); |
| |
| frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread"); |
| } |
| |
| private StateManager getStateManager(final String componentId) { |
| return stateManagerProvider.getStateManager(componentId); |
| } |
| |
| public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { |
| frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| command.run(); |
| } catch (final Throwable t) { |
| LOG.error("Failed to run Framework Task {} due to {}", taskName, t.toString()); |
| if (LOG.isDebugEnabled()) { |
| LOG.error("", t); |
| } |
| } |
| } |
| }, initialDelay, delay, timeUnit); |
| } |
| |
| /** |
| * Submits the given task to be executed exactly once in a background thread |
| * |
| * @param task the task to perform |
| */ |
| public Future<?> submitFrameworkTask(final Runnable task) { |
| return frameworkTaskExecutor.submit(task); |
| } |
| |
| @Override |
| public void setMaxThreadCount(final SchedulingStrategy schedulingStrategy, final int maxThreadCount) { |
| final SchedulingAgent agent = getSchedulingAgent(schedulingStrategy); |
| if (agent == null) { |
| return; |
| } |
| |
| agent.setMaxThreadCount(maxThreadCount); |
| } |
| |
| public void setSchedulingAgent(final SchedulingStrategy strategy, final SchedulingAgent agent) { |
| strategyAgentMap.put(strategy, agent); |
| } |
| |
| public SchedulingAgent getSchedulingAgent(final SchedulingStrategy strategy) { |
| return strategyAgentMap.get(strategy); |
| } |
| |
| private SchedulingAgent getSchedulingAgent(final Connectable connectable) { |
| return getSchedulingAgent(connectable.getSchedulingStrategy()); |
| } |
| |
| @Override |
| public void shutdown() { |
| for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) { |
| try { |
| schedulingAgent.shutdown(); |
| } catch (final Throwable t) { |
| LOG.error("Failed to shutdown Scheduling Agent {} due to {}", schedulingAgent, t.toString()); |
| LOG.error("", t); |
| } |
| } |
| |
| frameworkTaskExecutor.shutdown(); |
| componentLifeCycleThreadPool.shutdown(); |
| componentMonitoringThreadPool.shutdown(); |
| } |
| |
| @Override |
| public void schedule(final ReportingTaskNode taskNode) { |
| final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode)); |
| if (scheduleState.isScheduled()) { |
| return; |
| } |
| |
| final int activeThreadCount = scheduleState.getActiveThreadCount(); |
| if (activeThreadCount > 0) { |
| throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running"); |
| } |
| |
| if (!taskNode.isValid()) { |
| throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors()); |
| } |
| |
| final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); |
| scheduleState.setScheduled(true); |
| |
| final Runnable startReportingTaskRunnable = new Runnable() { |
| @Override |
| public void run() { |
| final long lastStopTime = scheduleState.getLastStopTime(); |
| final ReportingTask reportingTask = taskNode.getReportingTask(); |
| |
| // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time. |
| while (true) { |
| try { |
| synchronized (scheduleState) { |
| // if no longer scheduled to run, then we're finished. This can happen, for example, |
| // if the @OnScheduled method throws an Exception and the user stops the reporting task |
| // while we're administratively yielded. |
| // we also check if the schedule state's last start time is equal to what it was before. |
| // if not, then means that the reporting task has been stopped and started again, so we should just |
| // bail; another thread will be responsible for invoking the @OnScheduled methods. |
| if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) { |
| return; |
| } |
| |
| try (final NarCloseable x = NarCloseable.withNarLoader()) { |
| ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext()); |
| } |
| |
| agent.schedule(taskNode, scheduleState); |
| return; |
| } |
| } catch (final Exception e) { |
| final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; |
| final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); |
| componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); |
| |
| LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this " |
| + "ReportingTask and will attempt to schedule it again after {}", |
| new Object[] { reportingTask, e.toString(), administrativeYieldDuration }, e); |
| |
| try { |
| Thread.sleep(administrativeYieldMillis); |
| } catch (final InterruptedException ie) { |
| } |
| } |
| } |
| } |
| }; |
| |
| componentLifeCycleThreadPool.execute(startReportingTaskRunnable); |
| taskNode.setScheduledState(ScheduledState.RUNNING); |
| } |
| |
| @Override |
| public void unschedule(final ReportingTaskNode taskNode) { |
| final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode)); |
| if (!scheduleState.isScheduled()) { |
| return; |
| } |
| |
| taskNode.verifyCanStop(); |
| final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); |
| final ReportingTask reportingTask = taskNode.getReportingTask(); |
| taskNode.setScheduledState(ScheduledState.STOPPED); |
| |
| final Runnable unscheduleReportingTaskRunnable = new Runnable() { |
| @Override |
| public void run() { |
| final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); |
| |
| synchronized (scheduleState) { |
| scheduleState.setScheduled(false); |
| |
| try { |
| try (final NarCloseable x = NarCloseable.withNarLoader()) { |
| ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext); |
| } |
| } catch (final Exception e) { |
| final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; |
| final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); |
| componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause); |
| |
| LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", |
| reportingTask, cause.toString(), administrativeYieldDuration); |
| LOG.error("", cause); |
| |
| try { |
| Thread.sleep(administrativeYieldMillis); |
| } catch (final InterruptedException ie) { |
| } |
| } |
| |
| agent.unschedule(taskNode, scheduleState); |
| |
| if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext); |
| } |
| } |
| } |
| }; |
| |
| componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable); |
| } |
| |
| /** |
| * Starts the given {@link Processor} by invoking its |
| * {@link ProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)} |
| * . |
| * @see StandardProcessorNode#start(ScheduledExecutorService, long, |
| * org.apache.nifi.processor.ProcessContext, Runnable). |
| */ |
| @Override |
| public synchronized void startProcessor(final ProcessorNode procNode) { |
| StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, |
| this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry); |
| final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode)); |
| |
| SchedulingAgentCallback callback = new SchedulingAgentCallback() { |
| @Override |
| public void trigger() { |
| getSchedulingAgent(procNode).schedule(procNode, scheduleState); |
| } |
| |
| @Override |
| public Future<?> invokeMonitoringTask(Callable<?> task) { |
| scheduleState.incrementActiveThreadCount(); |
| return componentMonitoringThreadPool.submit(task); |
| } |
| |
| @Override |
| public void postMonitor() { |
| scheduleState.decrementActiveThreadCount(); |
| } |
| }; |
| |
| procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback); |
| } |
| |
| /** |
| * Stops the given {@link Processor} by invoking its |
| * {@link ProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, Callable)} |
| * . |
| * @see StandardProcessorNode#stop(ScheduledExecutorService, |
| * org.apache.nifi.processor.ProcessContext, Callable) |
| */ |
| @Override |
| public synchronized void stopProcessor(final ProcessorNode procNode) { |
| StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, |
| this.encryptor, getStateManager(procNode.getIdentifier()), variableRegistry); |
| final ScheduleState state = getScheduleState(procNode); |
| |
| procNode.stop(this.componentLifeCycleThreadPool, processContext, new Callable<Boolean>() { |
| @Override |
| public Boolean call() { |
| if (state.isScheduled()) { |
| getSchedulingAgent(procNode).unschedule(procNode, state); |
| } |
| return state.getActiveThreadCount() == 0; |
| } |
| }); |
| } |
| |
| @Override |
| public void yield(final ProcessorNode procNode) { |
| // This exists in the ProcessScheduler so that the scheduler can take |
| // advantage of the fact that |
| // the Processor was yielded and, as a result, avoid scheduling the |
| // Processor to potentially run |
| // (thereby skipping the overhead of the Context Switches) if nothing |
| // can be done. |
| // |
| // We used to implement this feature by canceling all futures for the |
| // given Processor and |
| // re-submitting them with a delay. However, this became problematic, |
| // because we have situations where |
| // a Processor will wait several seconds (often 30 seconds in the case |
| // of a network timeout), and then yield |
| // the context. If this Processor has X number of threads, we end up |
| // submitting X new tasks while the previous |
| // X-1 tasks are still running. At this point, another thread could |
| // finish and do the same thing, resulting in |
| // an additional X-1 extra tasks being submitted. |
| // |
| // As a result, we simply removed this buggy implementation, as it was a |
| // very minor performance optimization |
| // that gave very bad results. |
| } |
| |
| @Override |
| public void registerEvent(final Connectable worker) { |
| getSchedulingAgent(worker).onEvent(worker); |
| } |
| |
| @Override |
| public int getActiveThreadCount(final Object scheduled) { |
| return getScheduleState(scheduled).getActiveThreadCount(); |
| } |
| |
| @Override |
| public void startPort(final Port port) { |
| if (!port.isValid()) { |
| throw new IllegalStateException("Port " + port.getIdentifier() + " is not in a valid state"); |
| } |
| |
| port.onSchedulingStart(); |
| startConnectable(port); |
| } |
| |
| @Override |
| public void startFunnel(final Funnel funnel) { |
| startConnectable(funnel); |
| funnel.setScheduledState(ScheduledState.RUNNING); |
| } |
| |
| @Override |
| public void stopPort(final Port port) { |
| stopConnectable(port); |
| port.shutdown(); |
| } |
| |
| @Override |
| public void stopFunnel(final Funnel funnel) { |
| stopConnectable(funnel); |
| funnel.setScheduledState(ScheduledState.STOPPED); |
| } |
| |
| private synchronized void startConnectable(final Connectable connectable) { |
| if (connectable.getScheduledState() == ScheduledState.DISABLED) { |
| throw new IllegalStateException(connectable.getIdentifier() + " is disabled, so it cannot be started"); |
| } |
| |
| final ScheduleState scheduleState = getScheduleState(requireNonNull(connectable)); |
| if (scheduleState.isScheduled()) { |
| return; |
| } |
| |
| final int activeThreads = scheduleState.getActiveThreadCount(); |
| if (activeThreads > 0) { |
| throw new IllegalStateException("Port cannot be scheduled to run until its last " + activeThreads + " threads finish"); |
| } |
| |
| getSchedulingAgent(connectable).schedule(connectable, scheduleState); |
| scheduleState.setScheduled(true); |
| } |
| |
| private synchronized void stopConnectable(final Connectable connectable) { |
| final ScheduleState state = getScheduleState(requireNonNull(connectable)); |
| if (!state.isScheduled()) { |
| return; |
| } |
| |
| state.setScheduled(false); |
| getSchedulingAgent(connectable).unschedule(connectable, state); |
| |
| if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { |
| final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); |
| try (final NarCloseable x = NarCloseable.withNarLoader()) { |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void enableFunnel(final Funnel funnel) { |
| if (funnel.getScheduledState() != ScheduledState.DISABLED) { |
| throw new IllegalStateException("Funnel cannot be enabled because it is not disabled"); |
| } |
| funnel.setScheduledState(ScheduledState.STOPPED); |
| } |
| |
| @Override |
| public synchronized void disableFunnel(final Funnel funnel) { |
| if (funnel.getScheduledState() != ScheduledState.STOPPED) { |
| throw new IllegalStateException("Funnel cannot be disabled because its state its state is set to " + funnel.getScheduledState()); |
| } |
| funnel.setScheduledState(ScheduledState.DISABLED); |
| } |
| |
| @Override |
| public synchronized void disablePort(final Port port) { |
| if (port.getScheduledState() != ScheduledState.STOPPED) { |
| throw new IllegalStateException("Port cannot be disabled because its state is set to " + port.getScheduledState()); |
| } |
| |
| if (!(port instanceof AbstractPort)) { |
| throw new IllegalArgumentException(); |
| } |
| |
| ((AbstractPort) port).disable(); |
| } |
| |
| @Override |
| public synchronized void enablePort(final Port port) { |
| if (port.getScheduledState() != ScheduledState.DISABLED) { |
| throw new IllegalStateException("Funnel cannot be enabled because it is not disabled"); |
| } |
| |
| if (!(port instanceof AbstractPort)) { |
| throw new IllegalArgumentException(); |
| } |
| |
| ((AbstractPort) port).enable(); |
| } |
| |
| @Override |
| public synchronized void enableProcessor(final ProcessorNode procNode) { |
| procNode.enable(); |
| } |
| |
| @Override |
| public synchronized void disableProcessor(final ProcessorNode procNode) { |
| procNode.disable(); |
| } |
| |
| public synchronized void enableReportingTask(final ReportingTaskNode taskNode) { |
| if (taskNode.getScheduledState() != ScheduledState.DISABLED) { |
| throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled"); |
| } |
| |
| taskNode.setScheduledState(ScheduledState.STOPPED); |
| } |
| |
| public synchronized void disableReportingTask(final ReportingTaskNode taskNode) { |
| if (taskNode.getScheduledState() != ScheduledState.STOPPED) { |
| throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() |
| + " but transition to DISABLED state is allowed only from the STOPPED state"); |
| } |
| |
| taskNode.setScheduledState(ScheduledState.DISABLED); |
| } |
| |
| @Override |
| public boolean isScheduled(final Object scheduled) { |
| final ScheduleState scheduleState = scheduleStates.get(scheduled); |
| return scheduleState == null ? false : scheduleState.isScheduled(); |
| } |
| |
| /** |
| * Returns the ScheduleState that is registered for the given component; if |
| * no ScheduleState current is registered, one is created and registered |
| * atomically, and then that value is returned. |
| * |
| * @param schedulable |
| * schedulable |
| * @return scheduled state |
| */ |
| private ScheduleState getScheduleState(final Object schedulable) { |
| ScheduleState scheduleState = this.scheduleStates.get(schedulable); |
| if (scheduleState == null) { |
| scheduleState = new ScheduleState(); |
| this.scheduleStates.putIfAbsent(schedulable, scheduleState); |
| } |
| return scheduleState; |
| } |
| |
| @Override |
| public void enableControllerService(final ControllerServiceNode service) { |
| service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis); |
| } |
| |
| @Override |
| public void disableControllerService(final ControllerServiceNode service) { |
| service.disable(this.componentLifeCycleThreadPool); |
| } |
| |
| @Override |
| public void disableControllerServices(final List<ControllerServiceNode> services) { |
| if (!requireNonNull(services).isEmpty()) { |
| for (ControllerServiceNode controllerServiceNode : services) { |
| this.disableControllerService(controllerServiceNode); |
| } |
| } |
| } |
| } |