blob: 906526400f3cd4e771d6c51fcc2e4c29e0926515 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardControllerServiceProvider implements ControllerServiceProvider {
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
private final ProcessScheduler processScheduler;
private static final Set<Method> validDisabledMethods;
private final BulletinRepository bulletinRepo;
private final StateManagerProvider stateManagerProvider;
private final VariableRegistry variableRegistry;
private final FlowController flowController;
static {
// methods that are okay to be called when the service is disabled.
final Set<Method> validMethods = new HashSet<>();
for (final Method method : ControllerService.class.getMethods()) {
validMethods.add(method);
}
for (final Method method : Object.class.getMethods()) {
validMethods.add(method);
}
validDisabledMethods = Collections.unmodifiableSet(validMethods);
}
public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry) {
this.flowController = flowController;
this.processScheduler = scheduler;
this.bulletinRepo = bulletinRepo;
this.stateManagerProvider = stateManagerProvider;
this.variableRegistry = variableRegistry;
}
private Class<?>[] getInterfaces(final Class<?> cls) {
final List<Class<?>> allIfcs = new ArrayList<>();
populateInterfaces(cls, allIfcs);
return allIfcs.toArray(new Class<?>[allIfcs.size()]);
}
private void populateInterfaces(final Class<?> cls, final List<Class<?>> interfacesDefinedThusFar) {
final Class<?>[] ifc = cls.getInterfaces();
if (ifc != null && ifc.length > 0) {
for (final Class<?> i : ifc) {
interfacesDefinedThusFar.add(i);
}
}
final Class<?> superClass = cls.getSuperclass();
if (superClass != null) {
populateInterfaces(superClass, interfacesDefinedThusFar);
}
}
private StateManager getStateManager(final String componentId) {
return stateManagerProvider.getStateManager(componentId);
}
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
if (type == null || id == null) {
throw new NullPointerException();
}
final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader cl = ExtensionManager.getClassLoader(type);
final Class<?> rawClass;
try {
if (cl == null) {
rawClass = Class.forName(type);
} else {
Thread.currentThread().setContextClassLoader(cl);
rawClass = Class.forName(type, false, cl);
}
} catch (final Exception e) {
logger.error("Could not create Controller Service of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
Thread.currentThread().setContextClassLoader(currentContextClassLoader);
return createGhostControllerService(type, id);
}
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
final ControllerService originalService = controllerServiceClass.newInstance();
final AtomicReference<ControllerServiceNode> serviceNodeHolder = new AtomicReference<>(null);
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final String methodName = method.getName();
if ("initialize".equals(methodName) || "onPropertyModified".equals(methodName)) {
throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework");
}
final ControllerServiceNode node = serviceNodeHolder.get();
final ControllerServiceState state = node.getState();
final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED.
if (disabled && !validDisabledMethods.contains(method)) {
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier()
+ " because the Controller Service is disabled");
} catch (final Throwable e) {
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled");
}
}
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
return method.invoke(originalService, args);
} catch (final InvocationTargetException e) {
// If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want
// to instead re-throw what the ControllerService threw, so we pull it out of the InvocationTargetException.
throw e.getCause();
}
}
};
final ControllerService proxiedService;
if (cl == null) {
proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
} else {
proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
}
logger.info("Created Controller Service of type {} with identifier {}", type, id);
final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService);
originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this, getStateManager(id)));
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry);
serviceNodeHolder.set(serviceNode);
serviceNode.setName(rawClass.getSimpleName());
if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
}
}
return serviceNode;
} catch (final Throwable t) {
throw new ControllerServiceInstantiationException(t);
} finally {
if (currentContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(currentContextClassLoader);
}
}
}
private ControllerServiceNode createGhostControllerService(final String type, final String id) {
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final String methodName = method.getName();
if ("validate".equals(methodName)) {
final ValidationResult result = new ValidationResult.Builder()
.input("Any Property")
.subject("Missing Controller Service")
.valid(false)
.explanation("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found")
.build();
return Collections.singleton(result);
} else if ("getPropertyDescriptor".equals(methodName)) {
final String propertyName = (String) args[0];
return new PropertyDescriptor.Builder()
.name(propertyName)
.description(propertyName)
.sensitive(true)
.required(true)
.build();
} else if ("getPropertyDescriptors".equals(methodName)) {
return Collections.emptyList();
} else if ("onPropertyModified".equals(methodName)) {
return null;
} else if ("getIdentifier".equals(methodName)) {
return id;
} else if ("toString".equals(methodName)) {
return "GhostControllerService[id=" + id + ", type=" + type + "]";
} else if ("hashCode".equals(methodName)) {
return 91 * type.hashCode() + 41 * id.hashCode();
} else if ("equals".equals(methodName)) {
return proxy == args[0];
} else {
throw new IllegalStateException("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found");
}
}
};
final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(),
new Class[]{ControllerService.class}, invocationHandler);
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id,
new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry);
return serviceNode;
}
@Override
public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
final Set<ConfiguredComponent> updated = new HashSet<>();
for (final ControllerServiceNode nodeToDisable : toDisable) {
if (nodeToDisable.isActive()) {
nodeToDisable.verifyCanDisable(serviceSet);
updated.add(nodeToDisable);
}
}
Collections.reverse(toDisable);
processScheduler.disableControllerServices(toDisable);
return updated;
}
@Override
public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
final Set<ConfiguredComponent> updated = new HashSet<>();
// verify that we can start all components (that are not disabled) before doing anything
for (final ProcessorNode node : processors) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
updated.add(node);
}
}
// start all of the components that are not disabled
for (final ProcessorNode node : processors) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
node.getProcessGroup().startProcessor(node);
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() != ScheduledState.DISABLED) {
processScheduler.schedule(node);
updated.add(node);
}
}
return updated;
}
@Override
public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
final Set<ConfiguredComponent> updated = new HashSet<>();
// verify that we can stop all components (that are running) before doing anything
for (final ProcessorNode node : processors) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
node.verifyCanStop();
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
node.verifyCanStop();
}
}
// stop all of the components that are running
for (final ProcessorNode node : processors) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
node.getProcessGroup().stopProcessor(node);
updated.add(node);
}
}
for (final ReportingTaskNode node : reportingTasks) {
if (node.getScheduledState() == ScheduledState.RUNNING) {
processScheduler.unschedule(node);
updated.add(node);
}
}
return updated;
}
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
processScheduler.enableControllerService(serviceNode);
}
@Override
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
boolean shouldStart = true;
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
while (serviceIter.hasNext() && shouldStart) {
ControllerServiceNode controllerServiceNode = serviceIter.next();
List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
for (ControllerServiceNode requiredService : requiredServices) {
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
shouldStart = false;
}
}
}
if (shouldStart) {
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
try {
if (!controllerServiceNode.isActive()) {
this.enableControllerServiceDependenciesFirst(controllerServiceNode);
}
} catch (Exception e) {
logger.error("Failed to enable " + controllerServiceNode + " due to " + e);
if (this.bulletinRepo != null) {
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e));
}
}
}
}
}
private void enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
for (ControllerServiceNode depNode : serviceNode.getRequiredControllerServices()) {
if (!depNode.isActive()) {
this.enableControllerServiceDependenciesFirst(depNode);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Enabling " + serviceNode);
}
this.enableControllerService(serviceNode);
}
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
for (final ControllerServiceNode node : serviceNodeMap.values()) {
final List<ControllerServiceNode> branch = new ArrayList<>();
determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
orderedNodeLists.add(branch);
}
return orderedNodeLists;
}
private static void determineEnablingOrder(
final Map<String, ControllerServiceNode> serviceNodeMap,
final ControllerServiceNode contextNode,
final List<ControllerServiceNode> orderedNodes,
final Set<ControllerServiceNode> visited) {
if (visited.contains(contextNode)) {
return;
}
for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) {
if (entry.getKey().getControllerServiceDefinition() != null) {
final String referencedServiceId = entry.getValue();
if (referencedServiceId != null) {
final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
if (!orderedNodes.contains(referencedNode)) {
visited.add(contextNode);
determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
}
}
}
}
if (!orderedNodes.contains(contextNode)) {
orderedNodes.add(contextNode);
}
}
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
processScheduler.disableControllerService(serviceNode);
}
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? null : node.getProxiedControllerService();
}
private ProcessGroup getRootGroup() {
return flowController.getGroup(flowController.getRootGroupId());
}
@Override
public ControllerService getControllerServiceForComponent(final String serviceIdentifier, final String componentId) {
final ProcessGroup rootGroup = getRootGroup();
// Find the Process Group that owns the component.
ProcessGroup groupOfInterest = null;
final ProcessorNode procNode = rootGroup.findProcessor(componentId);
if (procNode == null) {
final ControllerServiceNode serviceNode = getControllerServiceNode(componentId);
if (serviceNode == null) {
final ReportingTaskNode taskNode = flowController.getReportingTaskNode(componentId);
if (taskNode == null) {
throw new IllegalStateException("Could not find any Processor, Reporting Task, or Controller Service with identifier " + componentId);
}
// we have confirmed that the component is a reporting task. We can only reference Controller Services
// that are scoped at the FlowController level in this case.
final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
} else {
groupOfInterest = serviceNode.getProcessGroup();
}
} else {
groupOfInterest = procNode.getProcessGroup();
}
if (groupOfInterest == null) {
final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
for (final ControllerServiceNode serviceNode : servicesForGroup) {
if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
return serviceNode.getProxiedControllerService();
}
}
return null;
}
@Override
public boolean isControllerServiceEnabled(final ControllerService service) {
return isControllerServiceEnabled(service.getIdentifier());
}
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLED == node.getState();
}
@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLING == node.getState();
}
@Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
final ControllerServiceNode rootServiceNode = flowController.getRootControllerService(serviceIdentifier);
if (rootServiceNode != null) {
return rootServiceNode;
}
return getRootGroup().findControllerService(serviceIdentifier);
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType, final String groupId) {
final Set<ControllerServiceNode> serviceNodes;
if (groupId == null) {
serviceNodes = flowController.getRootControllerServices();
} else {
ProcessGroup group = getRootGroup();
if (!FlowController.ROOT_GROUP_ID_ALIAS.equals(groupId) && !group.getIdentifier().equals(groupId)) {
group = group.findProcessGroup(groupId);
}
if (group == null) {
return Collections.emptySet();
}
serviceNodes = group.getControllerServices(true);
}
final Set<String> identifiers = new HashSet<>();
for (final ControllerServiceNode serviceNode : serviceNodes) {
if (requireNonNull(serviceType).isAssignableFrom(serviceNode.getProxiedControllerService().getClass())) {
identifiers.add(serviceNode.getIdentifier());
}
}
return identifiers;
}
@Override
public String getControllerServiceName(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? null : node.getName();
}
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
final ProcessGroup group = requireNonNull(serviceNode).getProcessGroup();
if (group == null) {
flowController.removeRootControllerService(serviceNode);
return;
}
group.removeControllerService(serviceNode);
}
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
final Set<ControllerServiceNode> allServices = new HashSet<>();
allServices.addAll(flowController.getRootControllerServices());
allServices.addAll(getRootGroup().findAllControllerServices());
return allServices;
}
/**
* Returns a List of all components that reference the given referencedNode
* (either directly or indirectly through another service) that are also of
* the given componentType. The list that is returned is in the order in
* which they will need to be 'activated' (enabled/started).
*
* @param referencedNode node
* @param componentType type
* @return list of components
*/
private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
final List<T> references = new ArrayList<>();
for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
if (componentType.isAssignableFrom(referencingComponent.getClass())) {
references.add(componentType.cast(referencingComponent));
}
if (referencingComponent instanceof ControllerServiceNode) {
final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
// find components recursively that depend on referencingNode.
final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
// For anything that depends on referencing node, we want to add it to the list, but we know
// that it must come after the referencing node, so we first remove any existing occurrence.
references.removeAll(recursive);
references.addAll(recursive);
}
}
return references;
}
@Override
public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences);
return enableReferencingServices(serviceNode, recursiveReferences);
}
private Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
if (!serviceNode.isActive()) {
serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
}
final Set<ConfiguredComponent> updated = new HashSet<>();
final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
nodeToEnable.verifyCanEnable(ifEnabled);
ifEnabled.add(nodeToEnable);
}
}
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
logger.debug("Enabling {} because it references {}", nodeToEnable, serviceNode);
enableControllerService(nodeToEnable);
updated.add(nodeToEnable);
}
}
return updated;
}
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
for (final ControllerServiceNode referencingService : referencingServices) {
referencingService.verifyCanEnable(referencingServiceSet);
}
}
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
for (final ReportingTaskNode taskNode : referencingReportingTasks) {
if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
taskNode.verifyCanStart(referencingServiceSet);
}
}
for (final ProcessorNode procNode : referencingProcessors) {
if (procNode.getScheduledState() != ScheduledState.DISABLED) {
procNode.verifyCanStart(referencingServiceSet);
}
}
}
@Override
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
for (final ControllerServiceNode nodeToDisable : toDisable) {
if (nodeToDisable.isActive()) {
nodeToDisable.verifyCanDisable(serviceSet);
}
}
}
@Override
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
// we can always stop referencing components
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException {
throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier");
}
}