blob: baccdfd400033fa209834dd85c6d2c9056beea49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class StandardControllerServiceProvider implements ControllerServiceProvider {
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
private final ProcessScheduler processScheduler;
private final BulletinRepository bulletinRepo;
private final FlowManager flowManager;
private final ExtensionManager extensionManager;
private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final FlowManager flowManager, final ExtensionManager extensionManager) {
this.processScheduler = scheduler;
this.bulletinRepo = bulletinRepo;
this.flowManager = flowManager;
this.extensionManager = extensionManager;
}
@Override
public void onControllerServiceAdded(final ControllerServiceNode serviceNode) {
serviceCache.putIfAbsent(serviceNode.getIdentifier(), serviceNode);
}
@Override
public Set<ComponentNode> disableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled.
final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
final Set<ComponentNode> updated = new HashSet<>();
for (final ControllerServiceNode nodeToDisable : toDisable) {
if (nodeToDisable.isActive()) {
nodeToDisable.verifyCanDisable(serviceSet);
updated.add(nodeToDisable);
}
}
Collections.reverse(toDisable);
processScheduler.disableControllerServices(toDisable);
return updated;
}
@Override
public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
final Set<ComponentNode> updated = new HashSet<>();
// verify that we can start all components (that are not disabled) before doing anything
for (final ProcessorNode node : processors) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
updated.add(node);
}
}
// start all of the components that are not disabled
for (final ProcessorNode node : processors) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.getProcessGroup().startProcessor(node, true);
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
processScheduler.schedule(node);
updated.add(node);
}
}
return updated;
}
@Override
public Set<ComponentNode> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
final Set<ComponentNode> updated = new HashSet<>();
// verify that we can stop all components (that are running) before doing anything
for (final ProcessorNode node : processors) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
node.verifyCanStop();
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
node.verifyCanStop();
}
}
// stop all of the components that are running
for (final ProcessorNode node : processors) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
node.getProcessGroup().stopProcessor(node);
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
processScheduler.unschedule(node);
updated.add(node);
}
}
return updated;
}
@Override
public CompletableFuture<Void> enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
serviceNode.reloadAdditionalResourcesIfNecessary();
return processScheduler.enableControllerService(serviceNode);
}
@Override
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
boolean shouldStart = true;
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
while (serviceIter.hasNext() && shouldStart) {
ControllerServiceNode controllerServiceNode = serviceIter.next();
List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
for (ControllerServiceNode requiredService : requiredServices) {
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
shouldStart = false;
logger.debug("Will not start {} because required service {} is not active and is not part of the collection of things to start", serviceNodes, requiredService);
}
}
}
if (shouldStart) {
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
try {
if (!controllerServiceNode.isActive()) {
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
future.get(30, TimeUnit.SECONDS);
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
}
} catch (final ControllerServiceNotValidException csnve) {
logger.warn("Failed to enable service {} because it is not currently valid", controllerServiceNode);
} catch (Exception e) {
logger.error("Failed to enable " + controllerServiceNode, e);
if (this.bulletinRepo != null) {
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e));
}
}
}
}
}
@Override
public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
final CompletableFuture<Void> future = new CompletableFuture<>();
processScheduler.submitFrameworkTask(() -> {
enableControllerServices(serviceNodes, future);
future.complete(null);
});
return future;
}
private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
// validate that we are able to start all of the services.
for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
for (ControllerServiceNode requiredService : requiredServices) {
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService);
completableFuture.completeExceptionally(new IllegalStateException("Cannot enable " + controllerServiceNode
+ " because it has a dependency on " + requiredService + ", which is not enabled"));
return;
}
}
}
for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
if (completableFuture.isCancelled()) {
return;
}
try {
if (!controllerServiceNode.isActive()) {
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
while (true) {
try {
future.get(1, TimeUnit.SECONDS);
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
break;
} catch (final TimeoutException e) {
if (completableFuture.isCancelled()) {
return;
}
} catch (final Exception e) {
logger.warn("Failed to enable service {}", controllerServiceNode, e);
completableFuture.completeExceptionally(e);
if (this.bulletinRepo != null) {
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
}
return;
}
}
}
} catch (Exception e) {
logger.error("Failed to enable " + controllerServiceNode, e);
if (this.bulletinRepo != null) {
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e));
}
}
}
}
@Override
public Future<Void> enableControllerServiceAndDependencies(final ControllerServiceNode serviceNode) {
final ControllerServiceState currentState = serviceNode.getState();
if (currentState == ControllerServiceState.ENABLED) {
logger.debug("Enabling of Controller Service {} triggered but service already enabled", serviceNode);
return CompletableFuture.completedFuture(null);
}
final List<ControllerServiceNode> dependentServices = serviceNode.getRequiredControllerServices();
for (final ControllerServiceNode depNode : dependentServices) {
if (!depNode.isActive()) {
logger.debug("Before enabling {}, will enable dependent Controller Service {}", serviceNode, depNode);
enableControllerServiceAndDependencies(depNode);
}
}
if (logger.isDebugEnabled()) {
logger.debug("All dependent services for {} have now begun enabling. Will wait for them to complete", serviceNode);
}
for (final ControllerServiceNode dependentService : dependentServices) {
try {
final boolean enabled = dependentService.awaitEnabled(30, TimeUnit.SECONDS);
if (enabled) {
logger.debug("Successfully enabled dependent service {}; service state = {}", dependentService, dependentService.getState());
} else {
logger.debug("After 30 seconds, {} is still not enabled. Will continue attempting to enable additional Controller Services", dependentService);
}
} catch (final Exception e) {
logger.error("Failed to enable service {}, so may be unable to enable {}", dependentService, serviceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
}
}
logger.debug("All dependent services have been enabled for {}; will now start service itself", serviceNode);
return this.enableControllerService(serviceNode);
}
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
for (final ControllerServiceNode node : serviceNodeMap.values()) {
final List<ControllerServiceNode> branch = new ArrayList<>();
determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<>());
orderedNodeLists.add(branch);
}
return orderedNodeLists;
}
private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode,
final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) {
if (visited.contains(contextNode)) {
return;
}
for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getEffectivePropertyValues().entrySet()) {
if (entry.getKey().getControllerServiceDefinition() != null) {
final String referencedServiceId = entry.getValue();
if (referencedServiceId != null) {
final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
if (!orderedNodes.contains(referencedNode)) {
visited.add(contextNode);
determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
}
}
}
}
if (!orderedNodes.contains(contextNode)) {
orderedNodes.add(contextNode);
}
}
@Override
public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
return processScheduler.disableControllerService(serviceNode);
}
@Override
public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
final CompletableFuture<Void> future = new CompletableFuture<>();
processScheduler.submitFrameworkTask(() -> {
disableControllerServices(serviceNodes, future);
future.complete(null);
});
return future;
}
private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) {
final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes);
// Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection
for (final ControllerServiceNode serviceNode : serviceNodes) {
final List<ControllerServiceNode> references = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
for (final ControllerServiceNode reference : references) {
if (reference.isActive()) {
try {
reference.verifyCanDisable(serviceNodeSet);
} catch (final Exception e) {
future.completeExceptionally(e);
}
}
}
}
for (final ControllerServiceNode serviceNode : serviceNodes) {
if (serviceNode.isActive()) {
disableReferencingServices(serviceNode);
final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
while (true) {
try {
serviceFuture.get(1, TimeUnit.SECONDS);
break;
} catch (final TimeoutException e) {
if (future.isCancelled()) {
return;
}
continue;
} catch (final Exception e) {
logger.error("Failed to disable {}", serviceNode, e);
future.completeExceptionally(e);
}
}
}
}
}
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? null : node.getProxiedControllerService();
}
private ProcessGroup getRootGroup() {
return flowManager.getRootGroup();
}
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
// Find the Process Group that owns the component.
ProcessGroup groupOfInterest;
final ProcessorNode procNode = flowManager.getProcessorNode(componentId);
if (procNode == null) {
final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
if (serviceNode == null) {
final ReportingTaskNode taskNode = flowManager.getReportingTaskNode(componentId);
if (taskNode == null) {
throw new IllegalStateException("Could not find any Processor, Reporting Task, or Controller Service with identifier " + componentId);
}
// we have confirmed that the component is a reporting task. We can only reference Controller Services
// that are scoped at the FlowController level in this case.
final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
} else {
groupOfInterest = serviceNode.getProcessGroup();
}
} else {
groupOfInterest = procNode.getProcessGroup();
}
if (groupOfInterest == null) {
final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
for (final ControllerServiceNode serviceNode : servicesForGroup) {
if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
return serviceNode.getProxiedControllerService();
}
}
return null;
}
@Override
public boolean isControllerServiceEnabled(final ControllerService service) {
return isControllerServiceEnabled(service.getIdentifier());
}
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node != null && ControllerServiceState.ENABLED == node.getState();
}
@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node != null && ControllerServiceState.ENABLING == node.getState();
}
@Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
final ControllerServiceNode rootServiceNode = flowManager.getRootControllerService(serviceIdentifier);
if (rootServiceNode != null) {
return rootServiceNode;
}
return serviceCache.get(serviceIdentifier);
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
final Set<ControllerServiceNode> serviceNodes;
if (groupId == null) {
serviceNodes = flowManager.getRootControllerServices();
} else {
ProcessGroup group = getRootGroup();
if (!FlowManager.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
group = group.findProcessGroup(groupId);
}
if (group == null) {
return Collections.emptySet();
}
serviceNodes = group.getControllerServices(true);
}
return serviceNodes.stream()
.filter(service -> serviceType.isAssignableFrom(service.getProxiedControllerService().getClass()))
.map(ControllerServiceNode::getIdentifier)
.collect(Collectors.toSet());
}
@Override
public String getControllerServiceName(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? null : node.getName();
}
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
requireNonNull(serviceNode);
serviceCache.remove(serviceNode.getIdentifier());
final ProcessGroup group = serviceNode.getProcessGroup();
if (group == null) {
flowManager.removeRootControllerService(serviceNode);
return;
}
group.removeControllerService(serviceNode);
LogRepositoryFactory.removeRepository(serviceNode.getIdentifier());
extensionManager.removeInstanceClassLoader(serviceNode.getIdentifier());
serviceCache.remove(serviceNode.getIdentifier());
}
@Override
public Collection<ControllerServiceNode> getNonRootControllerServices() {
return serviceCache.values().stream()
.filter(serviceNode -> serviceNode.getProcessGroup() != null)
.collect(Collectors.toSet());
}
@Override
public Set<ComponentNode> enableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> recursiveReferences = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences);
return enableReferencingServices(serviceNode, recursiveReferences);
}
private Set<ComponentNode> enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
if (!serviceNode.isActive()) {
serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
}
final Set<ComponentNode> updated = new HashSet<>();
final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
nodeToEnable.verifyCanEnable(ifEnabled);
ifEnabled.add(nodeToEnable);
}
}
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
logger.debug("Enabling {} because it references {}", nodeToEnable, serviceNode);
enableControllerService(nodeToEnable);
updated.add(nodeToEnable);
}
}
return updated;
}
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
for (final ControllerServiceNode referencingService : referencingServices) {
referencingService.verifyCanEnable(referencingServiceSet);
}
}
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final List<ReportingTaskNode> referencingReportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class);
final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
for (final ReportingTaskNode taskNode : referencingReportingTasks) {
if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
taskNode.verifyCanStart(referencingServiceSet);
}
}
for (final ProcessorNode procNode : referencingProcessors) {
if (procNode.getScheduledState() != ScheduledState.DISABLED) {
procNode.verifyCanStart(referencingServiceSet);
}
}
}
@Override
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled.
final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
if (nodeToDisable.isActive()) {
nodeToDisable.verifyCanDisable(serviceSet);
}
}
}
@Override
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
// we can always stop referencing components
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier");
}
@Override
public ExtensionManager getExtensionManager() {
return extensionManager;
}
}