| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.groups; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.builder.HashCodeBuilder; |
| import org.apache.commons.lang3.builder.ToStringBuilder; |
| import org.apache.commons.lang3.builder.ToStringStyle; |
| import org.apache.nifi.annotation.lifecycle.OnRemoved; |
| import org.apache.nifi.annotation.lifecycle.OnShutdown; |
| import org.apache.nifi.authorization.Resource; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.resource.ResourceFactory; |
| import org.apache.nifi.authorization.resource.ResourceType; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.state.StateManager; |
| import org.apache.nifi.components.state.StateManagerProvider; |
| import org.apache.nifi.connectable.Connectable; |
| import org.apache.nifi.connectable.ConnectableType; |
| import org.apache.nifi.connectable.Connection; |
| import org.apache.nifi.connectable.Funnel; |
| import org.apache.nifi.connectable.LocalPort; |
| import org.apache.nifi.connectable.Port; |
| import org.apache.nifi.connectable.Position; |
| import org.apache.nifi.connectable.Positionable; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.controller.FlowController; |
| import org.apache.nifi.controller.ProcessorNode; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.Snippet; |
| import org.apache.nifi.controller.Template; |
| import org.apache.nifi.controller.exception.ComponentLifeCycleException; |
| import org.apache.nifi.controller.label.Label; |
| import org.apache.nifi.controller.scheduling.StandardProcessScheduler; |
| import org.apache.nifi.controller.service.ControllerServiceNode; |
| import org.apache.nifi.controller.service.ControllerServiceProvider; |
| import org.apache.nifi.controller.service.StandardConfigurationContext; |
| import org.apache.nifi.encrypt.StringEncryptor; |
| import org.apache.nifi.logging.LogRepositoryFactory; |
| import org.apache.nifi.nar.NarCloseable; |
| import org.apache.nifi.processor.StandardProcessContext; |
| import org.apache.nifi.registry.VariableRegistry; |
| import org.apache.nifi.remote.RemoteGroupPort; |
| import org.apache.nifi.remote.RootGroupPort; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.apache.nifi.util.ReflectionUtils; |
| import org.apache.nifi.web.Revision; |
| import org.apache.nifi.web.api.dto.TemplateDTO; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| public final class StandardProcessGroup implements ProcessGroup { |
| |
| private final String id; |
| private final AtomicReference<ProcessGroup> parent; |
| private final AtomicReference<String> name; |
| private final AtomicReference<Position> position; |
| private final AtomicReference<String> comments; |
| |
| private final StandardProcessScheduler scheduler; |
| private final ControllerServiceProvider controllerServiceProvider; |
| private final FlowController flowController; |
| |
| private final Map<String, Port> inputPorts = new HashMap<>(); |
| private final Map<String, Port> outputPorts = new HashMap<>(); |
| private final Map<String, Connection> connections = new HashMap<>(); |
| private final Map<String, ProcessGroup> processGroups = new HashMap<>(); |
| private final Map<String, Label> labels = new HashMap<>(); |
| private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>(); |
| private final Map<String, ProcessorNode> processors = new HashMap<>(); |
| private final Map<String, Funnel> funnels = new HashMap<>(); |
| private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>(); |
| private final Map<String, Template> templates = new HashMap<>(); |
| private final StringEncryptor encryptor; |
| private final VariableRegistry variableRegistry; |
| |
| private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); |
| private final Lock readLock = rwLock.readLock(); |
| private final Lock writeLock = rwLock.writeLock(); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class); |
| |
| public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler, |
| final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController, |
| final VariableRegistry variableRegistry) { |
| this.id = id; |
| this.controllerServiceProvider = serviceProvider; |
| this.parent = new AtomicReference<>(); |
| this.scheduler = scheduler; |
| this.comments = new AtomicReference<>(""); |
| this.encryptor = encryptor; |
| this.flowController = flowController; |
| this.variableRegistry = variableRegistry; |
| |
| name = new AtomicReference<>(); |
| position = new AtomicReference<>(new Position(0D, 0D)); |
| } |
| |
| @Override |
| public ProcessGroup getParent() { |
| return parent.get(); |
| } |
| |
| private ProcessGroup getRoot() { |
| ProcessGroup root = this; |
| while (root.getParent() != null) { |
| root = root.getParent(); |
| } |
| return root; |
| } |
| |
| @Override |
| public void setParent(final ProcessGroup newParent) { |
| parent.set(newParent); |
| } |
| |
| @Override |
| public Authorizable getParentAuthorizable() { |
| return getParent(); |
| } |
| |
| @Override |
| public Resource getResource() { |
| return ResourceFactory.getComponentResource(ResourceType.ProcessGroup, getIdentifier(), getName()); |
| } |
| |
| @Override |
| public String getIdentifier() { |
| return id; |
| } |
| |
| @Override |
| public String getName() { |
| return name.get(); |
| } |
| |
| @Override |
| public void setName(final String name) { |
| if (StringUtils.isBlank(name)) { |
| throw new IllegalArgumentException("The name cannot be blank."); |
| } |
| |
| this.name.set(name); |
| } |
| |
| @Override |
| public void setPosition(final Position position) { |
| this.position.set(position); |
| } |
| |
| @Override |
| public Position getPosition() { |
| return position.get(); |
| } |
| |
| @Override |
| public String getComments() { |
| return this.comments.get(); |
| } |
| |
| @Override |
| public void setComments(final String comments) { |
| this.comments.set(comments); |
| } |
| |
| @Override |
| public ProcessGroupCounts getCounts() { |
| int inputPortCount = 0; |
| int outputPortCount = 0; |
| |
| int running = 0; |
| int stopped = 0; |
| int invalid = 0; |
| int disabled = 0; |
| int activeRemotePorts = 0; |
| int inactiveRemotePorts = 0; |
| |
| readLock.lock(); |
| try { |
| for (final ProcessorNode procNode : processors.values()) { |
| if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { |
| disabled++; |
| } else if (procNode.isRunning()) { |
| running++; |
| } else if (!procNode.isValid()) { |
| invalid++; |
| } else { |
| stopped++; |
| } |
| } |
| |
| inputPortCount = inputPorts.size(); |
| for (final Port port : inputPorts.values()) { |
| if (ScheduledState.DISABLED.equals(port.getScheduledState())) { |
| disabled++; |
| } else if (port.isRunning()) { |
| running++; |
| } else if (!port.isValid()) { |
| invalid++; |
| } else { |
| stopped++; |
| } |
| } |
| |
| outputPortCount = outputPorts.size(); |
| for (final Port port : outputPorts.values()) { |
| if (ScheduledState.DISABLED.equals(port.getScheduledState())) { |
| disabled++; |
| } else if (port.isRunning()) { |
| running++; |
| } else if (!port.isValid()) { |
| invalid++; |
| } else { |
| stopped++; |
| } |
| } |
| |
| for (final ProcessGroup childGroup : processGroups.values()) { |
| final ProcessGroupCounts childCounts = childGroup.getCounts(); |
| running += childCounts.getRunningCount(); |
| stopped += childCounts.getStoppedCount(); |
| invalid += childCounts.getInvalidCount(); |
| disabled += childCounts.getDisabledCount(); |
| } |
| |
| for (final RemoteProcessGroup remoteGroup : findAllRemoteProcessGroups()) { |
| // Count only input ports that have incoming connections |
| for (final Port port : remoteGroup.getInputPorts()) { |
| if (port.hasIncomingConnection()) { |
| if (port.isRunning()) { |
| activeRemotePorts++; |
| } else { |
| inactiveRemotePorts++; |
| } |
| } |
| } |
| |
| // Count only output ports that have outgoing connections |
| for (final Port port : remoteGroup.getOutputPorts()) { |
| if (!port.getConnections().isEmpty()) { |
| if (port.isRunning()) { |
| activeRemotePorts++; |
| } else { |
| inactiveRemotePorts++; |
| } |
| } |
| } |
| |
| final String authIssue = remoteGroup.getAuthorizationIssue(); |
| if (authIssue != null) { |
| invalid++; |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| |
| return new ProcessGroupCounts(inputPortCount, outputPortCount, running, stopped, |
| invalid, disabled, activeRemotePorts, inactiveRemotePorts); |
| } |
| |
| @Override |
| public boolean isRootGroup() { |
| return parent.get() == null; |
| } |
| |
| @Override |
| public void startProcessing() { |
| readLock.lock(); |
| try { |
| findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> { |
| try { |
| node.getProcessGroup().startProcessor(node); |
| } catch (final Throwable t) { |
| LOG.error("Unable to start processor {} due to {}", new Object[]{node.getIdentifier(), t}); |
| } |
| }); |
| |
| findAllInputPorts().stream().filter(SCHEDULABLE_PORTS).forEach(port -> { |
| port.getProcessGroup().startInputPort(port); |
| }); |
| |
| findAllOutputPorts().stream().filter(SCHEDULABLE_PORTS).forEach(port -> { |
| port.getProcessGroup().startOutputPort(port); |
| }); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void stopProcessing() { |
| readLock.lock(); |
| try { |
| findAllProcessors().stream().filter(UNSCHEDULABLE_PROCESSORS).forEach(node -> { |
| try { |
| node.getProcessGroup().stopProcessor(node); |
| } catch (final Throwable t) { |
| LOG.error("Unable to stop processor {} due to {}", new Object[]{node.getIdentifier(), t}); |
| } |
| }); |
| |
| findAllInputPorts().stream().filter(UNSCHEDULABLE_PORTS).forEach(port -> { |
| port.getProcessGroup().stopInputPort(port); |
| }); |
| |
| findAllOutputPorts().stream().filter(UNSCHEDULABLE_PORTS).forEach(port -> { |
| port.getProcessGroup().stopOutputPort(port); |
| }); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private StateManager getStateManager(final String componentId) { |
| return flowController.getStateManagerProvider().getStateManager(componentId); |
| } |
| |
| private void shutdown(final ProcessGroup procGroup) { |
| for (final ProcessorNode node : procGroup.getProcessors()) { |
| try (final NarCloseable x = NarCloseable.withNarLoader()) { |
| final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry); |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext); |
| } |
| } |
| |
| for (final RemoteProcessGroup rpg : procGroup.getRemoteProcessGroups()) { |
| rpg.shutdown(); |
| } |
| |
| // Recursively shutdown child groups. |
| for (final ProcessGroup group : procGroup.getProcessGroups()) { |
| shutdown(group); |
| } |
| } |
| |
| @Override |
| public void shutdown() { |
| readLock.lock(); |
| try { |
| shutdown(this); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addInputPort(final Port port) { |
| if (isRootGroup()) { |
| if (!(port instanceof RootGroupPort)) { |
| throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to the Root Group"); |
| } |
| } else if (!(port instanceof LocalPort)) { |
| throw new IllegalArgumentException("Cannot add Input Port of type " + port.getClass().getName() + " to a non-root group"); |
| } |
| |
| writeLock.lock(); |
| try { |
| if (inputPorts.containsKey(requireNonNull(port).getIdentifier()) |
| || getInputPortByName(port.getName()) != null) { |
| throw new IllegalStateException("The input port name or identifier is not available to be added."); |
| } |
| |
| port.setProcessGroup(this); |
| inputPorts.put(requireNonNull(port).getIdentifier(), port); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeInputPort(final Port port) { |
| writeLock.lock(); |
| try { |
| final Port toRemove = inputPorts.get(requireNonNull(port).getIdentifier()); |
| if (toRemove == null) { |
| throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group"); |
| } |
| |
| port.verifyCanDelete(); |
| for (final Connection conn : port.getConnections()) { |
| conn.verifyCanDelete(); |
| } |
| |
| if (port.isRunning()) { |
| stopInputPort(port); |
| } |
| |
| // must copy to avoid a concurrent modification |
| final Set<Connection> copy = new HashSet<>(port.getConnections()); |
| for (final Connection conn : copy) { |
| removeConnection(conn); |
| } |
| |
| final Port removed = inputPorts.remove(port.getIdentifier()); |
| if (removed == null) { |
| throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group"); |
| } |
| |
| LOG.info("Input Port {} removed from flow", port); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Port getInputPort(final String id) { |
| readLock.lock(); |
| try { |
| return inputPorts.get(Objects.requireNonNull(id)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<Port> getInputPorts() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(inputPorts.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addOutputPort(final Port port) { |
| if (isRootGroup()) { |
| if (!(port instanceof RootGroupPort)) { |
| throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to the Root Group"); |
| } |
| } else if (!(port instanceof LocalPort)) { |
| throw new IllegalArgumentException("Cannot add Output Port " + port.getClass().getName() + " to a non-root group"); |
| } |
| |
| writeLock.lock(); |
| try { |
| if (outputPorts.containsKey(requireNonNull(port).getIdentifier()) |
| || getOutputPortByName(port.getName()) != null) { |
| throw new IllegalStateException("Output Port with given identifier or name is not available"); |
| } |
| |
| port.setProcessGroup(this); |
| outputPorts.put(port.getIdentifier(), port); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeOutputPort(final Port port) { |
| writeLock.lock(); |
| try { |
| final Port toRemove = outputPorts.get(requireNonNull(port).getIdentifier()); |
| toRemove.verifyCanDelete(); |
| |
| if (port.isRunning()) { |
| stopOutputPort(port); |
| } |
| |
| if (!toRemove.getConnections().isEmpty()) { |
| throw new IllegalStateException(port.getIdentifier() + " cannot be removed until its connections are removed"); |
| } |
| |
| final Port removed = outputPorts.remove(port.getIdentifier()); |
| if (removed == null) { |
| throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group"); |
| } |
| |
| LOG.info("Output Port {} removed from flow", port); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Port getOutputPort(final String id) { |
| readLock.lock(); |
| try { |
| return outputPorts.get(Objects.requireNonNull(id)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<Port> getOutputPorts() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(outputPorts.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addProcessGroup(final ProcessGroup group) { |
| if (StringUtils.isEmpty(group.getName())) { |
| throw new IllegalArgumentException("Process Group's name must be specified"); |
| } |
| |
| writeLock.lock(); |
| try { |
| group.setParent(this); |
| processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ProcessGroup getProcessGroup(final String id) { |
| readLock.lock(); |
| try { |
| return processGroups.get(id); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ProcessGroup> getProcessGroups() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(processGroups.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeProcessGroup(final ProcessGroup group) { |
| requireNonNull(group).verifyCanDelete(); |
| |
| writeLock.lock(); |
| try { |
| final ProcessGroup toRemove = processGroups.get(group.getIdentifier()); |
| if (toRemove == null) { |
| throw new IllegalStateException(group.getIdentifier() + " is not a member of this Process Group"); |
| } |
| toRemove.verifyCanDelete(); |
| |
| removeComponents(group); |
| processGroups.remove(group.getIdentifier()); |
| LOG.info("{} removed from flow", group); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private void removeComponents(final ProcessGroup group) { |
| for (final Connection connection : new ArrayList<>(group.getConnections())) { |
| group.removeConnection(connection); |
| } |
| |
| for (final Port port : new ArrayList<>(group.getInputPorts())) { |
| group.removeInputPort(port); |
| } |
| |
| for (final Port port : new ArrayList<>(group.getOutputPorts())) { |
| group.removeOutputPort(port); |
| } |
| |
| for (final Funnel funnel : new ArrayList<>(group.getFunnels())) { |
| group.removeFunnel(funnel); |
| } |
| |
| for (final ProcessorNode processor : new ArrayList<>(group.getProcessors())) { |
| group.removeProcessor(processor); |
| } |
| |
| for (final RemoteProcessGroup rpg : new ArrayList<>(group.getRemoteProcessGroups())) { |
| group.removeRemoteProcessGroup(rpg); |
| } |
| |
| for (final Label label : new ArrayList<>(group.getLabels())) { |
| group.removeLabel(label); |
| } |
| |
| for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) { |
| group.removeProcessGroup(childGroup); |
| } |
| } |
| |
| @Override |
| public void addRemoteProcessGroup(final RemoteProcessGroup remoteGroup) { |
| writeLock.lock(); |
| try { |
| if (remoteGroups.containsKey(requireNonNull(remoteGroup).getIdentifier())) { |
| throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier()); |
| } |
| |
| remoteGroup.setProcessGroup(this); |
| remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<RemoteProcessGroup> getRemoteProcessGroups() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(remoteGroups.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeRemoteProcessGroup(final RemoteProcessGroup remoteProcessGroup) { |
| final String remoteGroupId = requireNonNull(remoteProcessGroup).getIdentifier(); |
| |
| writeLock.lock(); |
| try { |
| final RemoteProcessGroup remoteGroup = remoteGroups.get(remoteGroupId); |
| if (remoteGroup == null) { |
| throw new IllegalStateException(remoteProcessGroup.getIdentifier() + " is not a member of this Process Group"); |
| } |
| |
| remoteGroup.verifyCanDelete(); |
| for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) { |
| for (final Connection connection : port.getConnections()) { |
| connection.verifyCanDelete(); |
| } |
| } |
| |
| for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) { |
| // must copy to avoid a concurrent modification |
| final Set<Connection> copy = new HashSet<>(port.getConnections()); |
| for (final Connection connection : copy) { |
| removeConnection(connection); |
| } |
| } |
| |
| try { |
| remoteGroup.onRemove(); |
| } catch (final Exception e) { |
| LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e); |
| } |
| |
| remoteGroups.remove(remoteGroupId); |
| LOG.info("{} removed from flow", remoteProcessGroup); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addProcessor(final ProcessorNode processor) { |
| writeLock.lock(); |
| try { |
| final String processorId = requireNonNull(processor).getIdentifier(); |
| final ProcessorNode existingProcessor = processors.get(processorId); |
| if (existingProcessor != null) { |
| throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId); |
| } |
| |
| processor.setProcessGroup(this); |
| processors.put(processorId, processor); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeProcessor(final ProcessorNode processor) { |
| final String id = requireNonNull(processor).getIdentifier(); |
| writeLock.lock(); |
| try { |
| if (!processors.containsKey(id)) { |
| throw new IllegalStateException(processor.getIdentifier() + " is not a member of this Process Group"); |
| } |
| |
| processor.verifyCanDelete(); |
| for (final Connection conn : processor.getConnections()) { |
| conn.verifyCanDelete(); |
| } |
| |
| try (final NarCloseable x = NarCloseable.withNarLoader()) { |
| final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry); |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); |
| } catch (final Exception e) { |
| throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e); |
| } |
| |
| for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) { |
| final PropertyDescriptor descriptor = entry.getKey(); |
| if (descriptor.getControllerServiceDefinition() != null) { |
| final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); |
| if (value != null) { |
| final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value); |
| if (serviceNode != null) { |
| serviceNode.removeReference(processor); |
| } |
| } |
| } |
| } |
| |
| processors.remove(id); |
| LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); |
| |
| final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider(); |
| scheduler.submitFrameworkTask(new Runnable() { |
| @Override |
| public void run() { |
| stateManagerProvider.onComponentRemoved(processor.getIdentifier()); |
| } |
| }); |
| |
| // must copy to avoid a concurrent modification |
| final Set<Connection> copy = new HashSet<>(processor.getConnections()); |
| for (final Connection conn : copy) { |
| removeConnection(conn); |
| } |
| |
| LOG.info("{} removed from flow", processor); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ProcessorNode> getProcessors() { |
| readLock.lock(); |
| try { |
| return new LinkedHashSet<>(processors.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ProcessorNode getProcessor(final String id) { |
| readLock.lock(); |
| try { |
| return processors.get(Objects.requireNonNull(id)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private boolean isInputPort(final Connectable connectable) { |
| if (connectable.getConnectableType() != ConnectableType.INPUT_PORT) { |
| return false; |
| } |
| return findInputPort(connectable.getIdentifier()) != null; |
| } |
| |
| private boolean isOutputPort(final Connectable connectable) { |
| if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT) { |
| return false; |
| } |
| return findOutputPort(connectable.getIdentifier()) != null; |
| } |
| |
| @Override |
| public void inheritConnection(final Connection connection) { |
| writeLock.lock(); |
| try { |
| connections.put(connection.getIdentifier(), connection); |
| connection.setProcessGroup(this); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addConnection(final Connection connection) { |
| writeLock.lock(); |
| try { |
| final String id = requireNonNull(connection).getIdentifier(); |
| final Connection existingConnection = connections.get(id); |
| if (existingConnection != null) { |
| throw new IllegalStateException("Connection already exists with ID " + id); |
| } |
| |
| final Connectable source = connection.getSource(); |
| final Connectable destination = connection.getDestination(); |
| final ProcessGroup sourceGroup = source.getProcessGroup(); |
| final ProcessGroup destinationGroup = destination.getProcessGroup(); |
| |
| // validate the connection is validate wrt to the source & destination groups |
| if (isInputPort(source)) { // if source is an input port, its destination must be in the same group unless it's an input port |
| if (isInputPort(destination)) { // if destination is input port, it must be in a child group. |
| if (!processGroups.containsKey(destinationGroup.getIdentifier())) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because destination is an Input Port that does not belong to a child Process Group"); |
| } |
| } else if (sourceGroup != this || destinationGroup != this) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because source and destination are not both in this Process Group"); |
| } |
| } else if (isOutputPort(source)) { |
| // if source is an output port, its group must be a child of this group, and its destination must be in this |
| // group (processor/output port) or a child group (input port) |
| if (!processGroups.containsKey(sourceGroup.getIdentifier())) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because source is an Output Port that does not belong to a child Process Group"); |
| } |
| |
| if (isInputPort(destination)) { |
| if (!processGroups.containsKey(destinationGroup.getIdentifier())) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input Port that does not belong to a child Process Group"); |
| } |
| } else if (destinationGroup != this) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because its destination does not belong to this Process Group"); |
| } |
| } else { // source is not a port |
| if (sourceGroup != this) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because the source does not belong to this Process Group"); |
| } |
| |
| if (isOutputPort(destination)) { |
| if (destinationGroup != this) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Output Port but does not belong to this Process Group"); |
| } |
| } else if (isInputPort(destination)) { |
| if (!processGroups.containsKey(destinationGroup.getIdentifier())) { |
| throw new IllegalStateException("Cannot add Connection to Process Group because its destination is an Input " |
| + "Port but the Input Port does not belong to a child Process Group"); |
| } |
| } else if (destinationGroup != this) { |
| throw new IllegalStateException("Cannot add Connection between " + source.getIdentifier() + " and " + destination.getIdentifier() |
| + " because they are in different Process Groups and neither is an Input Port or Output Port"); |
| } |
| } |
| |
| connection.setProcessGroup(this); |
| source.addConnection(connection); |
| if (source != destination) { // don't call addConnection twice if it's a self-looping connection. |
| destination.addConnection(connection); |
| } |
| connections.put(connection.getIdentifier(), connection); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Connectable getConnectable(final String id) { |
| readLock.lock(); |
| try { |
| final ProcessorNode node = processors.get(id); |
| if (node != null) { |
| return node; |
| } |
| |
| final Port inputPort = inputPorts.get(id); |
| if (inputPort != null) { |
| return inputPort; |
| } |
| |
| final Port outputPort = outputPorts.get(id); |
| if (outputPort != null) { |
| return outputPort; |
| } |
| |
| final Funnel funnel = funnels.get(id); |
| if (funnel != null) { |
| return funnel; |
| } |
| |
| return null; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeConnection(final Connection connectionToRemove) { |
| writeLock.lock(); |
| try { |
| // verify that Connection belongs to this group |
| final Connection connection = connections.get(requireNonNull(connectionToRemove).getIdentifier()); |
| if (connection == null) { |
| throw new IllegalStateException("Connection " + connectionToRemove.getIdentifier() + " is not a member of this Process Group"); |
| } |
| |
| connectionToRemove.verifyCanDelete(); |
| |
| final Connectable source = connectionToRemove.getSource(); |
| final Connectable dest = connectionToRemove.getDestination(); |
| |
| // update the source & destination |
| source.removeConnection(connection); |
| if (source != dest) { |
| dest.removeConnection(connection); |
| } |
| |
| // remove the connection from our map |
| connections.remove(connection.getIdentifier()); |
| LOG.info("{} removed from flow", connection); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<Connection> getConnections() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(connections.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Connection getConnection(final String id) { |
| readLock.lock(); |
| try { |
| return connections.get(Objects.requireNonNull(id)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Connection findConnection(final String id) { |
| return findConnection(id, this); |
| } |
| |
| private Connection findConnection(final String id, final ProcessGroup start) { |
| Connection connection = start.getConnection(id); |
| if (connection != null) { |
| return connection; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| connection = findConnection(id, group); |
| if (connection != null) { |
| return connection; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public List<Connection> findAllConnections() { |
| return findAllConnections(this); |
| } |
| |
| private List<Connection> findAllConnections(final ProcessGroup group) { |
| final List<Connection> connections = new ArrayList<>(group.getConnections()); |
| for (final ProcessGroup childGroup : group.getProcessGroups()) { |
| connections.addAll(findAllConnections(childGroup)); |
| } |
| return connections; |
| } |
| |
| @Override |
| public void addLabel(final Label label) { |
| writeLock.lock(); |
| try { |
| final Label existing = labels.get(requireNonNull(label).getIdentifier()); |
| if (existing != null) { |
| throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier()); |
| } |
| |
| label.setProcessGroup(this); |
| labels.put(label.getIdentifier(), label); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeLabel(final Label label) { |
| writeLock.lock(); |
| try { |
| final Label removed = labels.remove(requireNonNull(label).getIdentifier()); |
| if (removed == null) { |
| throw new IllegalStateException(label + " is not a member of this Process Group."); |
| } |
| |
| LOG.info("Label with ID {} removed from flow", label.getIdentifier()); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<Label> getLabels() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(labels.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Label getLabel(final String id) { |
| readLock.lock(); |
| try { |
| return labels.get(id); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| readLock.lock(); |
| try { |
| return inputPorts.isEmpty() && outputPorts.isEmpty() && connections.isEmpty() |
| && processGroups.isEmpty() && labels.isEmpty() && processors.isEmpty() && remoteGroups.isEmpty(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public RemoteProcessGroup getRemoteProcessGroup(final String id) { |
| readLock.lock(); |
| try { |
| return remoteGroups.get(Objects.requireNonNull(id)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void startProcessor(final ProcessorNode processor) { |
| readLock.lock(); |
| try { |
| if (getProcessor(processor.getIdentifier()) == null) { |
| throw new IllegalStateException("Processor is not a member of this Process Group"); |
| } |
| |
| final ScheduledState state = processor.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("Processor is disabled"); |
| } else if (state == ScheduledState.RUNNING) { |
| return; |
| } |
| |
| scheduler.startProcessor(processor); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void startInputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (getInputPort(port.getIdentifier()) == null) { |
| throw new IllegalStateException("Port " + port.getIdentifier() + " is not a member of this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("InputPort " + port.getIdentifier() + " is disabled"); |
| } else if (state == ScheduledState.RUNNING) { |
| return; |
| } |
| |
| scheduler.startPort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void startOutputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (getOutputPort(port.getIdentifier()) == null) { |
| throw new IllegalStateException("Port is not a member of this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("OutputPort is disabled"); |
| } else if (state == ScheduledState.RUNNING) { |
| return; |
| } |
| |
| scheduler.startPort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void startFunnel(final Funnel funnel) { |
| readLock.lock(); |
| try { |
| if (getFunnel(funnel.getIdentifier()) == null) { |
| throw new IllegalStateException("Funnel is not a member of this Process Group"); |
| } |
| |
| final ScheduledState state = funnel.getScheduledState(); |
| if (state == ScheduledState.RUNNING) { |
| return; |
| } |
| scheduler.startFunnel(funnel); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void stopProcessor(final ProcessorNode processor) { |
| readLock.lock(); |
| try { |
| if (!processors.containsKey(processor.getIdentifier())) { |
| throw new IllegalStateException("No processor with ID " + processor.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = processor.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("Processor is disabled"); |
| } else if (state == ScheduledState.STOPPED) { |
| return; |
| } |
| |
| scheduler.stopProcessor(processor); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void stopInputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (!inputPorts.containsKey(port.getIdentifier())) { |
| throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("InputPort is disabled"); |
| } else if (state == ScheduledState.STOPPED) { |
| return; |
| } |
| |
| scheduler.stopPort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void stopOutputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (!outputPorts.containsKey(port.getIdentifier())) { |
| throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("OutputPort is disabled"); |
| } else if (state == ScheduledState.STOPPED) { |
| return; |
| } |
| |
| scheduler.stopPort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private void stopFunnel(final Funnel funnel) { |
| readLock.lock(); |
| try { |
| if (!funnels.containsKey(funnel.getIdentifier())) { |
| throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = funnel.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| throw new IllegalStateException("Funnel is disabled"); |
| } else if (state == ScheduledState.STOPPED) { |
| return; |
| } |
| |
| scheduler.stopFunnel(funnel); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void enableInputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (!inputPorts.containsKey(port.getIdentifier())) { |
| throw new IllegalStateException("No Input Port with ID " + port.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.STOPPED) { |
| return; |
| } else if (state == ScheduledState.RUNNING) { |
| throw new IllegalStateException("InputPort is currently running"); |
| } |
| |
| scheduler.enablePort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void enableOutputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (!outputPorts.containsKey(port.getIdentifier())) { |
| throw new IllegalStateException("No Output Port with ID " + port.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.STOPPED) { |
| return; |
| } else if (state == ScheduledState.RUNNING) { |
| throw new IllegalStateException("OutputPort is currently running"); |
| } |
| |
| scheduler.enablePort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void enableProcessor(final ProcessorNode processor) { |
| readLock.lock(); |
| try { |
| if (!processors.containsKey(processor.getIdentifier())) { |
| throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = processor.getScheduledState(); |
| if (state == ScheduledState.STOPPED) { |
| return; |
| } else if (state == ScheduledState.RUNNING) { |
| throw new IllegalStateException("Processor is currently running"); |
| } |
| |
| scheduler.enableProcessor(processor); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void disableInputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (!inputPorts.containsKey(port.getIdentifier())) { |
| throw new IllegalStateException("No InputPort with ID " + port.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| return; |
| } else if (state == ScheduledState.RUNNING) { |
| throw new IllegalStateException("InputPort is currently running"); |
| } |
| |
| scheduler.disablePort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void disableOutputPort(final Port port) { |
| readLock.lock(); |
| try { |
| if (!outputPorts.containsKey(port.getIdentifier())) { |
| throw new IllegalStateException("No OutputPort with ID " + port.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = port.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| return; |
| } else if (state == ScheduledState.RUNNING) { |
| throw new IllegalStateException("OutputPort is currently running"); |
| } |
| |
| scheduler.disablePort(port); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void disableProcessor(final ProcessorNode processor) { |
| readLock.lock(); |
| try { |
| if (!processors.containsKey(processor.getIdentifier())) { |
| throw new IllegalStateException("No Processor with ID " + processor.getIdentifier() + " belongs to this Process Group"); |
| } |
| |
| final ScheduledState state = processor.getScheduledState(); |
| if (state == ScheduledState.DISABLED) { |
| return; |
| } else if (state == ScheduledState.RUNNING) { |
| throw new IllegalStateException("Processor is currently running"); |
| } |
| |
| scheduler.disableProcessor(processor); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean equals(final Object obj) { |
| if (obj instanceof StandardProcessGroup) { |
| final StandardProcessGroup other = (StandardProcessGroup) obj; |
| return (getIdentifier().equals(other.getIdentifier())); |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public int hashCode() { |
| return new HashCodeBuilder().append(getIdentifier()).toHashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("identifier", getIdentifier()).toString(); |
| } |
| |
| @Override |
| public ProcessGroup findProcessGroup(final String id) { |
| return findProcessGroup(requireNonNull(id), this); |
| } |
| |
| private ProcessGroup findProcessGroup(final String id, final ProcessGroup start) { |
| if (id.equals(start.getIdentifier())) { |
| return start; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| final ProcessGroup matching = findProcessGroup(id, group); |
| if (matching != null) { |
| return matching; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public List<ProcessGroup> findAllProcessGroups() { |
| return findAllProcessGroups(this); |
| } |
| |
| private List<ProcessGroup> findAllProcessGroups(final ProcessGroup start) { |
| final List<ProcessGroup> allProcessGroups = new ArrayList<>(start.getProcessGroups()); |
| for (final ProcessGroup childGroup : start.getProcessGroups()) { |
| allProcessGroups.addAll(findAllProcessGroups(childGroup)); |
| } |
| return allProcessGroups; |
| } |
| |
| @Override |
| public List<RemoteProcessGroup> findAllRemoteProcessGroups() { |
| return findAllRemoteProcessGroups(this); |
| } |
| |
| private List<RemoteProcessGroup> findAllRemoteProcessGroups(final ProcessGroup start) { |
| final List<RemoteProcessGroup> remoteGroups = new ArrayList<>(start.getRemoteProcessGroups()); |
| for (final ProcessGroup childGroup : start.getProcessGroups()) { |
| remoteGroups.addAll(findAllRemoteProcessGroups(childGroup)); |
| } |
| return remoteGroups; |
| } |
| |
| @Override |
| public RemoteProcessGroup findRemoteProcessGroup(final String id) { |
| return findRemoteProcessGroup(requireNonNull(id), this); |
| } |
| |
| private RemoteProcessGroup findRemoteProcessGroup(final String id, final ProcessGroup start) { |
| RemoteProcessGroup remoteGroup = start.getRemoteProcessGroup(id); |
| if (remoteGroup != null) { |
| return remoteGroup; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| remoteGroup = findRemoteProcessGroup(id, group); |
| if (remoteGroup != null) { |
| return remoteGroup; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public ProcessorNode findProcessor(final String id) { |
| return findProcessor(id, this); |
| } |
| |
| private ProcessorNode findProcessor(final String id, final ProcessGroup start) { |
| ProcessorNode node = start.getProcessor(id); |
| if (node != null) { |
| return node; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| node = findProcessor(id, group); |
| if (node != null) { |
| return node; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public List<ProcessorNode> findAllProcessors() { |
| return findAllProcessors(this); |
| } |
| |
| private List<ProcessorNode> findAllProcessors(final ProcessGroup start) { |
| final List<ProcessorNode> allNodes = new ArrayList<>(start.getProcessors()); |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| allNodes.addAll(findAllProcessors(group)); |
| } |
| return allNodes; |
| } |
| |
| @Override |
| public Connectable findConnectable(final String identifier) { |
| return findConnectable(identifier, this); |
| } |
| |
| private static Connectable findConnectable(final String identifier, final ProcessGroup group) { |
| final ProcessorNode procNode = group.getProcessor(identifier); |
| if (procNode != null) { |
| return procNode; |
| } |
| |
| final Port inPort = group.getInputPort(identifier); |
| if (inPort != null) { |
| return inPort; |
| } |
| |
| final Port outPort = group.getOutputPort(identifier); |
| if (outPort != null) { |
| return outPort; |
| } |
| |
| final Funnel funnel = group.getFunnel(identifier); |
| if (funnel != null) { |
| return funnel; |
| } |
| |
| for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { |
| final RemoteGroupPort remoteInPort = remoteGroup.getInputPort(identifier); |
| if (remoteInPort != null) { |
| return remoteInPort; |
| } |
| |
| final RemoteGroupPort remoteOutPort = remoteGroup.getOutputPort(identifier); |
| if (remoteOutPort != null) { |
| return remoteOutPort; |
| } |
| } |
| |
| for (final ProcessGroup childGroup : group.getProcessGroups()) { |
| final Connectable childGroupConnectable = findConnectable(identifier, childGroup); |
| if (childGroupConnectable != null) { |
| return childGroupConnectable; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public Label findLabel(final String id) { |
| return findLabel(id, this); |
| } |
| |
| private Label findLabel(final String id, final ProcessGroup start) { |
| Label label = start.getLabel(id); |
| if (label != null) { |
| return label; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| label = findLabel(id, group); |
| if (label != null) { |
| return label; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public List<Label> findAllLabels() { |
| return findAllLabels(this); |
| } |
| |
| private List<Label> findAllLabels(final ProcessGroup start) { |
| final List<Label> allLabels = new ArrayList<>(start.getLabels()); |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| allLabels.addAll(findAllLabels(group)); |
| } |
| return allLabels; |
| } |
| |
| @Override |
| public Port findInputPort(final String id) { |
| return findPort(id, this, new InputPortRetriever()); |
| } |
| |
| @Override |
| public List<Port> findAllInputPorts() { |
| return findAllInputPorts(this); |
| } |
| |
| private List<Port> findAllInputPorts(final ProcessGroup start) { |
| final List<Port> allOutputPorts = new ArrayList<>(start.getInputPorts()); |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| allOutputPorts.addAll(findAllInputPorts(group)); |
| } |
| return allOutputPorts; |
| } |
| |
| @Override |
| public Port findOutputPort(final String id) { |
| return findPort(id, this, new OutputPortRetriever()); |
| } |
| |
| @Override |
| public List<Port> findAllOutputPorts() { |
| return findAllOutputPorts(this); |
| } |
| |
| private List<Port> findAllOutputPorts(final ProcessGroup start) { |
| final List<Port> allOutputPorts = new ArrayList<>(start.getOutputPorts()); |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| allOutputPorts.addAll(findAllOutputPorts(group)); |
| } |
| return allOutputPorts; |
| } |
| |
| @Override |
| public Port getInputPortByName(final String name) { |
| return getPortByName(name, this, new InputPortRetriever()); |
| } |
| |
| @Override |
| public Port getOutputPortByName(final String name) { |
| return getPortByName(name, this, new OutputPortRetriever()); |
| } |
| |
| private interface PortRetriever { |
| |
| Port getPort(ProcessGroup group, String id); |
| |
| Set<Port> getPorts(ProcessGroup group); |
| } |
| |
| private static class InputPortRetriever implements PortRetriever { |
| |
| @Override |
| public Set<Port> getPorts(final ProcessGroup group) { |
| return group.getInputPorts(); |
| } |
| |
| @Override |
| public Port getPort(final ProcessGroup group, final String id) { |
| return group.getInputPort(id); |
| } |
| } |
| |
| private static class OutputPortRetriever implements PortRetriever { |
| |
| @Override |
| public Set<Port> getPorts(final ProcessGroup group) { |
| return group.getOutputPorts(); |
| } |
| |
| @Override |
| public Port getPort(final ProcessGroup group, final String id) { |
| return group.getOutputPort(id); |
| } |
| } |
| |
| private Port findPort(final String id, final ProcessGroup group, final PortRetriever retriever) { |
| Port port = retriever.getPort(group, id); |
| if (port != null) { |
| return port; |
| } |
| |
| for (final ProcessGroup childGroup : group.getProcessGroups()) { |
| port = findPort(id, childGroup, retriever); |
| if (port != null) { |
| return port; |
| } |
| } |
| |
| return null; |
| } |
| |
| private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) { |
| for (final Port port : retriever.getPorts(group)) { |
| if (port.getName().equals(name)) { |
| return port; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public void addFunnel(final Funnel funnel) { |
| addFunnel(funnel, true); |
| } |
| |
| @Override |
| public void addFunnel(final Funnel funnel, final boolean autoStart) { |
| writeLock.lock(); |
| try { |
| final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier()); |
| if (existing != null) { |
| throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier()); |
| } |
| |
| funnel.setProcessGroup(this); |
| funnels.put(funnel.getIdentifier(), funnel); |
| |
| if (autoStart) { |
| startFunnel(funnel); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Funnel getFunnel(final String id) { |
| readLock.lock(); |
| try { |
| return funnels.get(id); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Funnel findFunnel(final String id) { |
| return findFunnel(id, this); |
| } |
| |
| private Funnel findFunnel(final String id, final ProcessGroup start) { |
| Funnel funnel = start.getFunnel(id); |
| if (funnel != null) { |
| return funnel; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| funnel = findFunnel(id, group); |
| if (funnel != null) { |
| return funnel; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public ControllerServiceNode findControllerService(final String id) { |
| return findControllerService(id, this); |
| } |
| |
| private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) { |
| ControllerServiceNode service = start.getControllerService(id); |
| if (service != null) { |
| return service; |
| } |
| |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| service = findControllerService(id, group); |
| if (service != null) { |
| return service; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public Set<ControllerServiceNode> findAllControllerServices() { |
| return findAllControllerServices(this); |
| } |
| |
| public Set<ControllerServiceNode> findAllControllerServices(ProcessGroup start) { |
| final Set<ControllerServiceNode> services = start.getControllerServices(false); |
| for (final ProcessGroup group : start.getProcessGroups()) { |
| services.addAll(findAllControllerServices(group)); |
| } |
| |
| return services; |
| } |
| |
| @Override |
| public void removeFunnel(final Funnel funnel) { |
| writeLock.lock(); |
| try { |
| final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier()); |
| if (existing == null) { |
| throw new IllegalStateException("Funnel " + funnel.getIdentifier() + " is not a member of this ProcessGroup"); |
| } |
| |
| funnel.verifyCanDelete(); |
| for (final Connection conn : funnel.getConnections()) { |
| conn.verifyCanDelete(); |
| } |
| |
| stopFunnel(funnel); |
| |
| // must copy to avoid a concurrent modification |
| final Set<Connection> copy = new HashSet<>(funnel.getConnections()); |
| for (final Connection conn : copy) { |
| removeConnection(conn); |
| } |
| |
| funnels.remove(funnel.getIdentifier()); |
| LOG.info("{} removed from flow", funnel); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<Funnel> getFunnels() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(funnels.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addControllerService(final ControllerServiceNode service) { |
| writeLock.lock(); |
| try { |
| final String id = requireNonNull(service).getIdentifier(); |
| final ControllerServiceNode existingService = controllerServices.get(id); |
| if (existingService != null) { |
| throw new IllegalStateException("A Controller Service is already registered to this ProcessGroup with ID " + id); |
| } |
| |
| service.setProcessGroup(this); |
| this.controllerServices.put(service.getIdentifier(), service); |
| LOG.info("{} added to {}", service, this); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ControllerServiceNode getControllerService(final String id) { |
| readLock.lock(); |
| try { |
| return controllerServices.get(requireNonNull(id)); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<ControllerServiceNode> getControllerServices(final boolean recursive) { |
| readLock.lock(); |
| try { |
| final Set<ControllerServiceNode> services = new HashSet<>(); |
| services.addAll(controllerServices.values()); |
| |
| if (recursive && parent.get() != null) { |
| services.addAll(parent.get().getControllerServices(true)); |
| } |
| |
| return services; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removeControllerService(final ControllerServiceNode service) { |
| writeLock.lock(); |
| try { |
| final ControllerServiceNode existing = controllerServices.get(requireNonNull(service).getIdentifier()); |
| if (existing == null) { |
| throw new IllegalStateException("ControllerService " + service.getIdentifier() + " is not a member of this Process Group"); |
| } |
| |
| service.verifyCanDelete(); |
| |
| try (final NarCloseable x = NarCloseable.withNarLoader()) { |
| final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry); |
| ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); |
| } |
| |
| for (final Map.Entry<PropertyDescriptor, String> entry : service.getProperties().entrySet()) { |
| final PropertyDescriptor descriptor = entry.getKey(); |
| if (descriptor.getControllerServiceDefinition() != null) { |
| final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); |
| if (value != null) { |
| final ControllerServiceNode referencedNode = getControllerService(value); |
| if (referencedNode != null) { |
| referencedNode.removeReference(service); |
| } |
| } |
| } |
| } |
| |
| controllerServices.remove(service.getIdentifier()); |
| flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier()); |
| |
| LOG.info("{} removed from {}", service, this); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addTemplate(final Template template) { |
| requireNonNull(template); |
| |
| writeLock.lock(); |
| try { |
| final String id = template.getDetails().getId(); |
| if (id == null) { |
| throw new IllegalStateException("Cannot add template that has no ID"); |
| } |
| |
| if (templates.containsKey(id)) { |
| throw new IllegalStateException("Process Group already contains a Template with ID " + id); |
| } |
| |
| templates.put(id, template); |
| template.setProcessGroup(this); |
| LOG.info("{} added to {}", template, this); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Template getTemplate(final String id) { |
| readLock.lock(); |
| try { |
| return templates.get(id); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Template findTemplate(final String id) { |
| return findTemplate(id, this); |
| } |
| |
| private Template findTemplate(final String id, final ProcessGroup start) { |
| final Template template = start.getTemplate(id); |
| if (template != null) { |
| return template; |
| } |
| |
| for (final ProcessGroup child : start.getProcessGroups()) { |
| final Template childTemplate = findTemplate(id, child); |
| if (childTemplate != null) { |
| return childTemplate; |
| } |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public Set<Template> getTemplates() { |
| readLock.lock(); |
| try { |
| return new HashSet<>(templates.values()); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Set<Template> findAllTemplates() { |
| return findAllTemplates(this); |
| } |
| |
| private Set<Template> findAllTemplates(final ProcessGroup group) { |
| final Set<Template> templates = new HashSet<>(group.getTemplates()); |
| for (final ProcessGroup childGroup : group.getProcessGroups()) { |
| templates.addAll(findAllTemplates(childGroup)); |
| } |
| return templates; |
| } |
| |
| @Override |
| public void removeTemplate(final Template template) { |
| writeLock.lock(); |
| try { |
| final Template existing = templates.get(requireNonNull(template).getIdentifier()); |
| if (existing == null) { |
| throw new IllegalStateException("Template " + template.getIdentifier() + " is not a member of this ProcessGroup"); |
| } |
| |
| templates.remove(template.getIdentifier()); |
| LOG.info("{} removed from flow", template); |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void remove(final Snippet snippet) { |
| writeLock.lock(); |
| try { |
| // ensure that all components are valid |
| verifyContents(snippet); |
| |
| final Set<Connectable> connectables = getAllConnectables(snippet); |
| final Set<String> connectionIdsToRemove = new HashSet<>(getKeys(snippet.getConnections())); |
| // Remove all connections that are the output of any Connectable. |
| for (final Connectable connectable : connectables) { |
| for (final Connection conn : connectable.getConnections()) { |
| if (!connections.containsKey(conn.getIdentifier())) { |
| throw new IllegalStateException("Connectable component " + connectable.getIdentifier() |
| + " cannot be removed because it has incoming connections from the parent Process Group"); |
| } |
| connectionIdsToRemove.add(conn.getIdentifier()); |
| } |
| } |
| |
| // verify that all connections can be removed |
| for (final String id : connectionIdsToRemove) { |
| connections.get(id).verifyCanDelete(); |
| } |
| |
| // verify that all processors are stopped and have no active threads |
| for (final String procId : snippet.getProcessors().keySet()) { |
| final ProcessorNode procNode = getProcessor(procId); |
| if (procNode.isRunning()) { |
| throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it is running"); |
| } |
| final int activeThreadCount = scheduler.getActiveThreadCount(procNode); |
| if (activeThreadCount != 0) { |
| throw new IllegalStateException("Processor " + procNode.getIdentifier() + " cannot be removed because it still has " + activeThreadCount + " active threads"); |
| } |
| } |
| |
| // verify that none of the connectables have incoming connections that are not in the Snippet. |
| final Set<String> connectionIds = snippet.getConnections().keySet(); |
| for (final Connectable connectable : connectables) { |
| for (final Connection conn : connectable.getIncomingConnections()) { |
| if (!connectionIds.contains(conn.getIdentifier()) && !connectables.contains(conn.getSource())) { |
| throw new IllegalStateException("Connectable component " + connectable.getIdentifier() + " cannot be removed because it has incoming connections " |
| + "that are not selected to be deleted"); |
| } |
| } |
| } |
| |
| // verify that all of the ProcessGroups in the snippet are empty |
| for (final String groupId : snippet.getProcessGroups().keySet()) { |
| final ProcessGroup toRemove = getProcessGroup(groupId); |
| toRemove.verifyCanDelete(true); |
| } |
| |
| for (final String id : connectionIdsToRemove) { |
| removeConnection(connections.get(id)); |
| } |
| for (final String id : getKeys(snippet.getInputPorts())) { |
| removeInputPort(inputPorts.get(id)); |
| } |
| for (final String id : getKeys(snippet.getOutputPorts())) { |
| removeOutputPort(outputPorts.get(id)); |
| } |
| for (final String id : getKeys(snippet.getFunnels())) { |
| removeFunnel(funnels.get(id)); |
| } |
| for (final String id : getKeys(snippet.getLabels())) { |
| removeLabel(labels.get(id)); |
| } |
| for (final String id : getKeys(snippet.getProcessors())) { |
| removeProcessor(processors.get(id)); |
| } |
| for (final String id : getKeys(snippet.getRemoteProcessGroups())) { |
| removeRemoteProcessGroup(remoteGroups.get(id)); |
| } |
| for (final String id : getKeys(snippet.getProcessGroups())) { |
| removeProcessGroup(processGroups.get(id)); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private Set<String> getKeys(final Map<String, Revision> map) { |
| return (map == null) ? Collections.emptySet() : map.keySet(); |
| } |
| |
| @Override |
| public void move(final Snippet snippet, final ProcessGroup destination) { |
| writeLock.lock(); |
| try { |
| verifyContents(snippet); |
| |
| if (!isDisconnected(snippet)) { |
| throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); |
| } |
| |
| if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) { |
| throw new IllegalStateException("Cannot move Ports out of the root group"); |
| } |
| |
| if (destination.isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) { |
| throw new IllegalStateException("Cannot move Ports into the root group"); |
| } |
| |
| for (final String id : getKeys(snippet.getInputPorts())) { |
| destination.addInputPort(inputPorts.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getOutputPorts())) { |
| destination.addOutputPort(outputPorts.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getFunnels())) { |
| destination.addFunnel(funnels.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getLabels())) { |
| destination.addLabel(labels.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getProcessGroups())) { |
| destination.addProcessGroup(processGroups.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getProcessors())) { |
| destination.addProcessor(processors.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getRemoteProcessGroups())) { |
| destination.addRemoteProcessGroup(remoteGroups.remove(id)); |
| } |
| for (final String id : getKeys(snippet.getConnections())) { |
| destination.inheritConnection(connections.remove(id)); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| private Set<Connectable> getAllConnectables(final Snippet snippet) { |
| final Set<Connectable> connectables = new HashSet<>(); |
| for (final String id : getKeys(snippet.getInputPorts())) { |
| connectables.add(getInputPort(id)); |
| } |
| for (final String id : getKeys(snippet.getOutputPorts())) { |
| connectables.add(getOutputPort(id)); |
| } |
| for (final String id : getKeys(snippet.getFunnels())) { |
| connectables.add(getFunnel(id)); |
| } |
| for (final String id : getKeys(snippet.getProcessors())) { |
| connectables.add(getProcessor(id)); |
| } |
| return connectables; |
| } |
| |
| private boolean isDisconnected(final Snippet snippet) { |
| final Set<Connectable> connectables = getAllConnectables(snippet); |
| |
| for (final String id : getKeys(snippet.getRemoteProcessGroups())) { |
| final RemoteProcessGroup remoteGroup = getRemoteProcessGroup(id); |
| connectables.addAll(remoteGroup.getInputPorts()); |
| connectables.addAll(remoteGroup.getOutputPorts()); |
| } |
| |
| final Set<String> connectionIds = snippet.getConnections().keySet(); |
| for (final Connectable connectable : connectables) { |
| for (final Connection conn : connectable.getIncomingConnections()) { |
| if (!connectionIds.contains(conn.getIdentifier())) { |
| return false; |
| } |
| } |
| |
| for (final Connection conn : connectable.getConnections()) { |
| if (!connectionIds.contains(conn.getIdentifier())) { |
| return false; |
| } |
| } |
| } |
| |
| final Set<Connectable> recursiveConnectables = new HashSet<>(connectables); |
| for (final String id : snippet.getProcessGroups().keySet()) { |
| final ProcessGroup childGroup = getProcessGroup(id); |
| recursiveConnectables.addAll(findAllConnectables(childGroup, true)); |
| } |
| |
| for (final String id : connectionIds) { |
| final Connection connection = getConnection(id); |
| if (!recursiveConnectables.contains(connection.getSource()) || !recursiveConnectables.contains(connection.getDestination())) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public Set<Positionable> findAllPositionables() { |
| Set<Positionable> positionables = Sets.newHashSet(); |
| positionables.addAll(findAllConnectables(this, true)); |
| List<ProcessGroup> allProcessGroups = findAllProcessGroups(); |
| positionables.addAll(allProcessGroups); |
| positionables.addAll(findAllRemoteProcessGroups()); |
| positionables.addAll(findAllLabels()); |
| return positionables; |
| } |
| |
| private Set<Connectable> findAllConnectables(final ProcessGroup group, final boolean includeRemotePorts) { |
| final Set<Connectable> set = new HashSet<>(); |
| set.addAll(group.getInputPorts()); |
| set.addAll(group.getOutputPorts()); |
| set.addAll(group.getFunnels()); |
| set.addAll(group.getProcessors()); |
| if (includeRemotePorts) { |
| for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { |
| set.addAll(remoteGroup.getInputPorts()); |
| set.addAll(remoteGroup.getOutputPorts()); |
| } |
| } |
| |
| for (final ProcessGroup childGroup : group.getProcessGroups()) { |
| set.addAll(findAllConnectables(childGroup, includeRemotePorts)); |
| } |
| |
| return set; |
| } |
| |
| /** |
| * Verifies that all ID's defined within the given snippet reference |
| * components within this ProcessGroup. If this is not the case, throws |
| * {@link IllegalStateException}. |
| * |
| * @param snippet the snippet |
| * @throws NullPointerException if the argument is null |
| * @throws IllegalStateException if the snippet contains an ID that |
| * references a component that is not part of this ProcessGroup |
| */ |
| private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException { |
| requireNonNull(snippet); |
| |
| verifyAllKeysExist(snippet.getInputPorts().keySet(), inputPorts, "Input Port"); |
| verifyAllKeysExist(snippet.getOutputPorts().keySet(), outputPorts, "Output Port"); |
| verifyAllKeysExist(snippet.getFunnels().keySet(), funnels, "Funnel"); |
| verifyAllKeysExist(snippet.getLabels().keySet(), labels, "Label"); |
| verifyAllKeysExist(snippet.getProcessGroups().keySet(), processGroups, "Process Group"); |
| verifyAllKeysExist(snippet.getProcessors().keySet(), processors, "Processor"); |
| verifyAllKeysExist(snippet.getRemoteProcessGroups().keySet(), remoteGroups, "Remote Process Group"); |
| verifyAllKeysExist(snippet.getConnections().keySet(), connections, "Connection"); |
| } |
| |
| /** |
| * <p> |
| * Verifies that all ID's specified by the given set exist as keys in the |
| * given Map. If any of the ID's does not exist as a key in the map, will |
| * throw {@link IllegalStateException} indicating the ID that is invalid and |
| * specifying the Component Type. |
| * </p> |
| * |
| * <p> |
| * If the ids given are null, will do no validation. |
| * </p> |
| * |
| * @param ids ids |
| * @param map map |
| * @param componentType type |
| */ |
| private void verifyAllKeysExist(final Set<String> ids, final Map<String, ?> map, final String componentType) { |
| if (ids != null) { |
| for (final String id : ids) { |
| if (!map.containsKey(id)) { |
| throw new IllegalStateException("ID " + id + " does not refer to a(n) " + componentType + " in this ProcessGroup"); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void verifyCanAddTemplate(final String name) { |
| // ensure the name is specified |
| if (StringUtils.isBlank(name)) { |
| throw new IllegalArgumentException("Template name cannot be blank."); |
| } |
| |
| for (final Template template : getRoot().findAllTemplates()) { |
| final TemplateDTO existingDto = template.getDetails(); |
| |
| // ensure a template with this name doesnt already exist |
| if (name.equals(existingDto.getName())) { |
| throw new IllegalStateException(String.format("A template named '%s' already exists.", name)); |
| } |
| } |
| } |
| |
| @Override |
| public void verifyCanDelete() { |
| verifyCanDelete(false); |
| } |
| |
| @Override |
| public void verifyCanDelete(final boolean ignoreConnections) { |
| readLock.lock(); |
| try { |
| for (final Port port : inputPorts.values()) { |
| port.verifyCanDelete(true); |
| } |
| |
| for (final Port port : outputPorts.values()) { |
| port.verifyCanDelete(true); |
| } |
| |
| for (final ProcessorNode procNode : processors.values()) { |
| procNode.verifyCanDelete(true); |
| } |
| |
| for (final Connection connection : connections.values()) { |
| connection.verifyCanDelete(); |
| } |
| |
| for (final ProcessGroup childGroup : processGroups.values()) { |
| // For nested child groups we can ignore the input/output port |
| // connections as they will be being deleted anyway. |
| childGroup.verifyCanDelete(true); |
| } |
| |
| if (!templates.isEmpty()) { |
| throw new IllegalStateException(String.format("Cannot delete Process Group because it contains %s Templates. The Templates must be deleted first.", templates.size())); |
| } |
| |
| if (!ignoreConnections) { |
| for (final Port port : inputPorts.values()) { |
| for (final Connection connection : port.getIncomingConnections()) { |
| if (connection.getSource().equals(port)) { |
| connection.verifyCanDelete(); |
| } else { |
| throw new IllegalStateException("Cannot delete Process Group because Input Port " + port.getIdentifier() |
| + " has at least one incoming connection from a component outside of the Process Group. Delete this connection first."); |
| } |
| } |
| } |
| |
| for (final Port port : outputPorts.values()) { |
| for (final Connection connection : port.getConnections()) { |
| if (connection.getDestination().equals(port)) { |
| connection.verifyCanDelete(); |
| } else { |
| throw new IllegalStateException("Cannot delete Process Group because Output Port " + port.getIdentifier() |
| + " has at least one outgoing connection to a component outside of the Process Group. Delete this connection first."); |
| } |
| } |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void verifyCanStop(Connectable connectable) { |
| } |
| |
| @Override |
| public void verifyCanStop() { |
| } |
| |
| @Override |
| public void verifyCanStart(Connectable connectable) { |
| readLock.lock(); |
| try { |
| if (connectable.getScheduledState() == ScheduledState.STOPPED) { |
| if (scheduler.getActiveThreadCount(connectable) > 0) { |
| throw new IllegalStateException("Cannot start component with id" + connectable.getIdentifier() + " because it is currently stopping"); |
| } |
| |
| connectable.verifyCanStart(); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void verifyCanStart() { |
| readLock.lock(); |
| try { |
| for (final Connectable connectable : findAllConnectables(this, false)) { |
| verifyCanStart(connectable); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void verifyCanDelete(final Snippet snippet) throws IllegalStateException { |
| readLock.lock(); |
| try { |
| if (!id.equals(snippet.getParentGroupId())) { |
| throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id); |
| } |
| |
| if (!isDisconnected(snippet)) { |
| throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); |
| } |
| |
| for (final String id : snippet.getConnections().keySet()) { |
| final Connection connection = getConnection(id); |
| if (connection == null) { |
| throw new IllegalStateException("Snippet references Connection with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| |
| connection.verifyCanDelete(); |
| } |
| |
| for (final String id : snippet.getFunnels().keySet()) { |
| final Funnel funnel = getFunnel(id); |
| if (funnel == null) { |
| throw new IllegalStateException("Snippet references Funnel with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| |
| funnel.verifyCanDelete(true); |
| } |
| |
| for (final String id : snippet.getInputPorts().keySet()) { |
| final Port port = getInputPort(id); |
| if (port == null) { |
| throw new IllegalStateException("Snippet references Input Port with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| |
| port.verifyCanDelete(true); |
| } |
| |
| for (final String id : snippet.getLabels().keySet()) { |
| final Label label = getLabel(id); |
| if (label == null) { |
| throw new IllegalStateException("Snippet references Label with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| } |
| |
| for (final String id : snippet.getOutputPorts().keySet()) { |
| final Port port = getOutputPort(id); |
| if (port == null) { |
| throw new IllegalStateException("Snippet references Output Port with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| port.verifyCanDelete(true); |
| } |
| |
| for (final String id : snippet.getProcessGroups().keySet()) { |
| final ProcessGroup group = getProcessGroup(id); |
| if (group == null) { |
| throw new IllegalStateException("Snippet references Process Group with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| group.verifyCanDelete(true); |
| } |
| |
| for (final String id : snippet.getProcessors().keySet()) { |
| final ProcessorNode processor = getProcessor(id); |
| if (processor == null) { |
| throw new IllegalStateException("Snippet references Processor with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| processor.verifyCanDelete(true); |
| } |
| |
| for (final String id : snippet.getRemoteProcessGroups().keySet()) { |
| final RemoteProcessGroup group = getRemoteProcessGroup(id); |
| if (group == null) { |
| throw new IllegalStateException("Snippet references Remote Process Group with ID " + id + ", which does not exist in this ProcessGroup"); |
| } |
| group.verifyCanDelete(true); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void verifyCanMove(final Snippet snippet, final ProcessGroup newProcessGroup) throws IllegalStateException { |
| readLock.lock(); |
| try { |
| if (!id.equals(snippet.getParentGroupId())) { |
| throw new IllegalStateException("Snippet belongs to ProcessGroup with ID " + snippet.getParentGroupId() + " but this ProcessGroup has id " + id); |
| } |
| |
| verifyContents(snippet); |
| |
| if (!isDisconnected(snippet)) { |
| throw new IllegalStateException("One or more components within the snippet is connected to a component outside of the snippet. Only a disconnected snippet may be moved."); |
| } |
| |
| if (isRootGroup() && (!snippet.getInputPorts().isEmpty() || !snippet.getOutputPorts().isEmpty())) { |
| throw new IllegalStateException("Cannot move Ports from the Root Group to a Non-Root Group"); |
| } |
| |
| for (final String id : snippet.getInputPorts().keySet()) { |
| final Port port = getInputPort(id); |
| final String portName = port.getName(); |
| |
| if (newProcessGroup.getInputPortByName(portName) != null) { |
| throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Input Port with the name " + portName); |
| } |
| } |
| |
| for (final String id : snippet.getOutputPorts().keySet()) { |
| final Port port = getOutputPort(id); |
| final String portName = port.getName(); |
| |
| if (newProcessGroup.getOutputPortByName(portName) != null) { |
| throw new IllegalStateException("Cannot perform Move Operation because the destination Process Group already has an Output Port with the name " + portName); |
| } |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| } |