blob: 07b2081fd5eb3d78681303beaa3109ff5c254850 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.engine.StatelessSchedulingAgent;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* A ProcessScheduler that handles the lifecycle management of components but does not
* schedule the triggering of components.
*/
public class StatelessProcessScheduler implements ProcessScheduler {
private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class);
private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000;
private static final int PROCESSOR_START_TIMEOUT_MILLIS = 10_000;
private final SchedulingAgent schedulingAgent;
private final ExtensionManager extensionManager;
private FlowEngine componentLifeCycleThreadPool;
private ScheduledExecutorService componentMonitoringThreadPool;
private ProcessContextFactory processContextFactory;
public StatelessProcessScheduler(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
schedulingAgent = new StatelessSchedulingAgent(extensionManager);
}
@Override
public void shutdown() {
if (componentLifeCycleThreadPool != null) {
componentLifeCycleThreadPool.shutdown();
}
if (componentMonitoringThreadPool != null) {
componentMonitoringThreadPool.shutdown();
}
}
@Override
public void shutdownControllerService(final ControllerServiceNode serviceNode, final ControllerServiceProvider controllerServiceProvider) {
final Class<?> serviceImplClass = serviceNode.getControllerServiceImplementation().getClass();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, VariableRegistry.EMPTY_REGISTRY);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
}
}
@Override
public void shutdownReportingTask(final ReportingTaskNode taskNode) {
final ConfigurationContext configContext = taskNode.getConfigurationContext();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
}
}
public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition<?> dataflowDefinition) {
this.processContextFactory = processContextFactory;
final String threadNameSuffix = dataflowDefinition.getFlowName() == null ? "" : " for dataflow " + dataflowDefinition.getFlowName();
componentLifeCycleThreadPool = new FlowEngine(8, "Component Lifecycle" + threadNameSuffix, true);
componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle" + threadNameSuffix, true);
}
@Override
public Future<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@Override
public void trigger() {
// Initialization / scheduling has completed.
future.complete(null);
}
@Override
public Future<?> scheduleTask(final Callable<?> task) {
return componentLifeCycleThreadPool.submit(task);
}
@Override
public void onTaskComplete() {
}
};
logger.info("Starting {}", procNode);
final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode);
procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, PROCESSOR_START_TIMEOUT_MILLIS, processContextSupplier, callback, failIfStopping);
return future;
}
@Override
public Future<Void> runProcessorOnce(ProcessorNode procNode, final Callable<Future<Void>> stopCallback) {
throw new UnsupportedOperationException();
}
@Override
public Future<Void> stopProcessor(final ProcessorNode procNode) {
logger.info("Stopping {}", procNode);
final ProcessContext processContext = processContextFactory.createProcessContext(procNode);
final LifecycleState lifecycleState = new LifecycleState();
lifecycleState.setScheduled(false);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, schedulingAgent, lifecycleState);
}
@Override
public void terminateProcessor(final ProcessorNode procNode) {
}
@Override
public void onProcessorRemoved(final ProcessorNode procNode) {
}
@Override
public void onPortRemoved(final Port port) {
}
@Override
public void onFunnelRemoved(final Funnel funnel) {
}
@Override
public void onReportingTaskRemoved(final ReportingTaskNode reportingTask) {
}
@Override
public void startPort(final Port port) {
if (!port.isValid()) {
throw new IllegalStateException("Port " + port.getIdentifier() + " is not in a valid state");
}
port.onSchedulingStart();
}
@Override
public void stopPort(final Port port) {
}
@Override
public void startFunnel(final Funnel funnel) {
}
@Override
public void stopFunnel(final Funnel funnel) {
}
@Override
public void enableFunnel(final Funnel funnel) {
}
@Override
public void enablePort(final Port port) {
}
@Override
public void enableProcessor(final ProcessorNode procNode) {
procNode.enable();
}
@Override
public void disableFunnel(final Funnel funnel) {
}
@Override
public void disablePort(final Port port) {
}
@Override
public void disableProcessor(final ProcessorNode procNode) {
procNode.disable();
}
@Override
public int getActiveThreadCount(final Object scheduled) {
return 0;
}
@Override
public boolean isScheduled(final Object scheduled) {
return false;
}
@Override
public void registerEvent(final Connectable worker) {
}
@Override
public void setMaxThreadCount(final SchedulingStrategy strategy, final int maxThreadCount) {
}
@Override
public void yield(final ProcessorNode procNode) {
}
@Override
public void unschedule(final ReportingTaskNode taskNode) {
}
@Override
public void schedule(final ReportingTaskNode taskNode) {
final Runnable scheduleTask = new Runnable() {
@Override
public void run() {
try {
attemptSchedule(taskNode);
schedulingAgent.schedule(taskNode, new LifecycleState());
logger.info("Successfully scheduled {} to run every {}", taskNode, taskNode.getSchedulingPeriod());
} catch (final Exception e) {
logger.error("Could not schedule {} to run. Will try again in 30 seconds.", taskNode, e);
componentLifeCycleThreadPool.schedule(this, 30, TimeUnit.SECONDS);
}
}
};
componentLifeCycleThreadPool.submit(scheduleTask);
}
private void attemptSchedule(final ReportingTaskNode taskNode) throws InvocationTargetException, IllegalAccessException {
final ValidationStatus validation = taskNode.performValidation();
if (validation != ValidationStatus.VALID) {
throw new IllegalStateException("Cannot start Reporting Task " + taskNode + " because it is not valid: " + taskNode.getValidationErrors());
}
final ReportingTask reportingTask = taskNode.getReportingTask();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, reportingTask.getClass(), taskNode.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
}
@Override
public CompletableFuture<Void> enableControllerService(final ControllerServiceNode service) {
logger.info("Enabling {}", service);
return service.enable(componentLifeCycleThreadPool, ADMINISTRATIVE_YIELD_MILLIS);
}
@Override
public CompletableFuture<Void> disableControllerService(final ControllerServiceNode service) {
logger.info("Disabling {}", service);
return service.disable(componentLifeCycleThreadPool);
}
@Override
public CompletableFuture<Void> disableControllerServices(final List<ControllerServiceNode> services) {
if (services == null || services.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = null;
for (ControllerServiceNode controllerServiceNode : services) {
final CompletableFuture<Void> serviceFuture = this.disableControllerService(controllerServiceNode);
if (future == null) {
future = serviceFuture;
} else {
future = CompletableFuture.allOf(future, serviceFuture);
}
}
return future;
}
@Override
public Future<?> submitFrameworkTask(final Runnable task) {
return null;
}
}