| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.connectable; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.builder.EqualsBuilder; |
| import org.apache.commons.lang3.builder.HashCodeBuilder; |
| import org.apache.nifi.authorization.AccessDeniedException; |
| import org.apache.nifi.authorization.AuthorizationResult; |
| import org.apache.nifi.authorization.AuthorizationResult.Result; |
| import org.apache.nifi.authorization.Authorizer; |
| import org.apache.nifi.authorization.RequestAction; |
| import org.apache.nifi.authorization.Resource; |
| import org.apache.nifi.authorization.resource.Authorizable; |
| import org.apache.nifi.authorization.user.NiFiUser; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.queue.ConnectionEventListener; |
| import org.apache.nifi.controller.queue.FlowFileQueue; |
| import org.apache.nifi.controller.queue.FlowFileQueueFactory; |
| import org.apache.nifi.controller.queue.LoadBalanceStrategy; |
| import org.apache.nifi.controller.repository.FlowFileRecord; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.processor.FlowFileFilter; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.remote.RemoteGroupPort; |
| import org.apache.nifi.scheduling.SchedulingStrategy; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Models a connection between connectable components. A connection may contain |
| * one or more relationships that map the source component to the destination |
| * component. |
| */ |
| public final class StandardConnection implements Connection, ConnectionEventListener { |
| |
| private final String id; |
| private final AtomicReference<ProcessGroup> processGroup; |
| private final AtomicReference<String> name; |
| private final AtomicReference<List<Position>> bendPoints; |
| private final Connectable source; |
| private final AtomicReference<Connectable> destination; |
| private final AtomicReference<Collection<Relationship>> relationships; |
| private final AtomicInteger labelIndex = new AtomicInteger(1); |
| private final AtomicLong zIndex = new AtomicLong(0L); |
| private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); |
| private final ProcessScheduler scheduler; |
| private final int hashCode; |
| |
| private volatile FlowFileQueue flowFileQueue; |
| |
| private StandardConnection(final Builder builder) { |
| id = builder.id; |
| name = new AtomicReference<>(builder.name); |
| bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints))); |
| processGroup = new AtomicReference<>(builder.processGroup); |
| source = builder.source; |
| destination = new AtomicReference<>(builder.destination); |
| relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); |
| scheduler = builder.scheduler; |
| |
| flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this, processGroup.get()); |
| hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); |
| } |
| |
| @Override |
| public ProcessGroup getProcessGroup() { |
| return processGroup.get(); |
| } |
| |
| @Override |
| public String getIdentifier() { |
| return id; |
| } |
| |
| @Override |
| public String getName() { |
| return name.get(); |
| } |
| |
| @Override |
| public void setName(final String name) { |
| this.name.set(name); |
| } |
| |
| @Override |
| public Authorizable getParentAuthorizable() { |
| return getProcessGroup(); |
| } |
| |
| @Override |
| public Resource getResource() { |
| return new Resource() { |
| @Override |
| public String getIdentifier() { |
| return "/connections/" + StandardConnection.this.getIdentifier(); |
| } |
| |
| @Override |
| public String getName() { |
| String name = StandardConnection.this.getName(); |
| |
| final Collection<Relationship> relationships = getRelationships(); |
| if (name == null && relationships != null && !relationships.isEmpty()) { |
| name = StringUtils.join(relationships.stream().map(Relationship::getName).collect(Collectors.toSet()), ", "); |
| } |
| |
| if (name == null) { |
| name = "Connection"; |
| } |
| |
| return name; |
| } |
| |
| @Override |
| public String getSafeDescription() { |
| return "Connection " + StandardConnection.this.getIdentifier(); |
| } |
| }; |
| } |
| |
| @Override |
| public void triggerDestinationEvent() { |
| if (getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { |
| scheduler.registerEvent(getDestination()); |
| } |
| } |
| |
| @Override |
| public void triggerSourceEvent() { |
| if (getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { |
| scheduler.registerEvent(getSource()); |
| } |
| } |
| |
| @Override |
| public Authorizable getSourceAuthorizable() { |
| final Connectable sourceConnectable = getSource(); |
| final Authorizable sourceAuthorizable; |
| |
| // if the source is a remote group port, authorize according to the RPG |
| if (sourceConnectable instanceof RemoteGroupPort) { |
| sourceAuthorizable = ((RemoteGroupPort) sourceConnectable).getRemoteProcessGroup(); |
| } else { |
| sourceAuthorizable = sourceConnectable; |
| } |
| |
| return sourceAuthorizable; |
| } |
| |
| @Override |
| public Authorizable getDestinationAuthorizable() { |
| final Connectable destinationConnectable = getDestination(); |
| final Authorizable destinationAuthorizable; |
| |
| // if the destination is a remote group port, authorize according to the RPG |
| if (destinationConnectable instanceof RemoteGroupPort) { |
| destinationAuthorizable = ((RemoteGroupPort) destinationConnectable).getRemoteProcessGroup(); |
| } else { |
| destinationAuthorizable = destinationConnectable; |
| } |
| |
| return destinationAuthorizable; |
| } |
| |
| @Override |
| public AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) { |
| if (user == null) { |
| return AuthorizationResult.denied("Unknown user."); |
| } |
| |
| // check the source |
| final AuthorizationResult sourceResult = getSourceAuthorizable().checkAuthorization(authorizer, action, user, resourceContext); |
| if (Result.Denied.equals(sourceResult.getResult())) { |
| return sourceResult; |
| } |
| |
| // check the destination |
| return getDestinationAuthorizable().checkAuthorization(authorizer, action, user, resourceContext); |
| } |
| |
| @Override |
| public void authorize(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) throws AccessDeniedException { |
| if (user == null) { |
| throw new AccessDeniedException("Unknown user."); |
| } |
| |
| getSourceAuthorizable().authorize(authorizer, action, user, resourceContext); |
| getDestinationAuthorizable().authorize(authorizer, action, user, resourceContext); |
| } |
| |
| @Override |
| public List<Position> getBendPoints() { |
| return bendPoints.get(); |
| } |
| |
| @Override |
| public void setBendPoints(final List<Position> position) { |
| this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position))); |
| } |
| |
| @Override |
| public int getLabelIndex() { |
| return labelIndex.get(); |
| } |
| |
| @Override |
| public void setLabelIndex(final int labelIndex) { |
| this.labelIndex.set(labelIndex); |
| } |
| |
| @Override |
| public long getZIndex() { |
| return zIndex.get(); |
| } |
| |
| @Override |
| public void setZIndex(final long zIndex) { |
| this.zIndex.set(zIndex); |
| } |
| |
| @Override |
| public Connectable getSource() { |
| return source; |
| } |
| |
| @Override |
| public Connectable getDestination() { |
| return destination.get(); |
| } |
| |
| @Override |
| public Collection<Relationship> getRelationships() { |
| return relationships.get(); |
| } |
| |
| @Override |
| public FlowFileQueue getFlowFileQueue() { |
| return flowFileQueue; |
| } |
| |
| @Override |
| public void setProcessGroup(final ProcessGroup newGroup) { |
| final ProcessGroup currentGroup = this.processGroup.get(); |
| try { |
| this.processGroup.set(newGroup); |
| } catch (final RuntimeException e) { |
| this.processGroup.set(currentGroup); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void setRelationships(final Collection<Relationship> newRelationships) { |
| final Collection<Relationship> currentRelationships = relationships.get(); |
| if (currentRelationships.equals(newRelationships)) { |
| return; |
| } |
| |
| try { |
| getSource().verifyCanUpdate(); |
| } catch (final IllegalStateException ise) { |
| throw new IllegalStateException("Cannot update the relationships for Connection", ise); |
| } |
| |
| try { |
| this.relationships.set(new ArrayList<>(newRelationships)); |
| getSource().updateConnection(this); |
| } catch (final RuntimeException e) { |
| this.relationships.set(currentRelationships); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void setDestination(final Connectable newDestination) { |
| final Connectable previousDestination = destination.get(); |
| if (previousDestination.equals(newDestination)) { |
| return; |
| } |
| |
| if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) { |
| throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); |
| } |
| |
| if (getFlowFileQueue().isUnacknowledgedFlowFile()) { |
| throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination); |
| } |
| |
| if (newDestination instanceof Funnel && newDestination.equals(source)) { |
| throw new IllegalStateException("Funnels do not support self-looping connections."); |
| } |
| |
| try { |
| previousDestination.removeConnection(this); |
| this.destination.set(newDestination); |
| getSource().updateConnection(this); |
| |
| newDestination.addConnection(this); |
| scheduler.registerEvent(newDestination); |
| } catch (final RuntimeException e) { |
| this.destination.set(previousDestination); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void lock() { |
| flowFileQueue.lock(); |
| } |
| |
| @Override |
| public void unlock() { |
| flowFileQueue.unlock(); |
| } |
| |
| @Override |
| public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { |
| return flowFileQueue.poll(filter, expiredRecords); |
| } |
| |
| @Override |
| public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) { |
| return flowFileQueue.poll(expiredRecords); |
| } |
| |
| @Override |
| public boolean equals(final Object other) { |
| if (!(other instanceof Connection)) { |
| return false; |
| } |
| final Connection con = (Connection) other; |
| return new EqualsBuilder().append(id, con.getIdentifier()).isEquals(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return hashCode; |
| } |
| |
| @Override |
| public String toString() { |
| return "Connection[ID=" + getIdentifier() + ", Source ID=" + getSource().getIdentifier() + ", Dest ID=" + getDestination().getIdentifier() + "]"; |
| } |
| |
| /** |
| * Gives this Connection ownership of the given FlowFile and allows the |
| * Connection to hold on to the FlowFile but NOT provide the FlowFile to |
| * consumers. This allows us to ensure that the Connection is not deleted |
| * during the middle of a Session commit. |
| * |
| * @param flowFile to add |
| */ |
| @Override |
| public void enqueue(final FlowFileRecord flowFile) { |
| flowFileQueue.put(flowFile); |
| } |
| |
| @Override |
| public void enqueue(final Collection<FlowFileRecord> flowFiles) { |
| flowFileQueue.putAll(flowFiles); |
| } |
| |
| public static class Builder { |
| |
| private final ProcessScheduler scheduler; |
| |
| private String id = UUID.randomUUID().toString(); |
| private String name; |
| private List<Position> bendPoints = new ArrayList<>(); |
| private ProcessGroup processGroup; |
| private Connectable source; |
| private Connectable destination; |
| private Collection<Relationship> relationships; |
| private FlowFileQueueFactory flowFileQueueFactory; |
| private boolean clustered = false; |
| |
| public Builder(final ProcessScheduler scheduler) { |
| this.scheduler = scheduler; |
| } |
| |
| public Builder id(final String id) { |
| this.id = id; |
| return this; |
| } |
| |
| public Builder source(final Connectable source) { |
| this.source = source; |
| return this; |
| } |
| |
| public Builder processGroup(final ProcessGroup group) { |
| this.processGroup = group; |
| return this; |
| } |
| |
| public Builder destination(final Connectable destination) { |
| this.destination = destination; |
| return this; |
| } |
| |
| public Builder relationships(final Collection<Relationship> relationships) { |
| this.relationships = new ArrayList<>(relationships); |
| return this; |
| } |
| |
| public Builder name(final String name) { |
| this.name = name; |
| return this; |
| } |
| |
| public Builder bendPoints(final List<Position> bendPoints) { |
| this.bendPoints.clear(); |
| this.bendPoints.addAll(bendPoints); |
| return this; |
| } |
| |
| public Builder addBendPoint(final Position bendPoint) { |
| bendPoints.add(bendPoint); |
| return this; |
| } |
| |
| public Builder flowFileQueueFactory(final FlowFileQueueFactory flowFileQueueFactory) { |
| this.flowFileQueueFactory = flowFileQueueFactory; |
| return this; |
| } |
| |
| public Builder clustered(final boolean clustered) { |
| this.clustered = clustered; |
| return this; |
| } |
| |
| public StandardConnection build() { |
| if (processGroup == null) { |
| throw new IllegalStateException("Cannot build a Connection without a Process Group"); |
| } |
| if (source == null) { |
| throw new IllegalStateException("Cannot build a Connection without a Source"); |
| } |
| if (destination == null) { |
| throw new IllegalStateException("Cannot build a Connection without a Destination"); |
| } |
| if (flowFileQueueFactory == null) { |
| throw new IllegalStateException("Cannot build a Connection without a FlowFileQueueFactory"); |
| } |
| |
| if (relationships == null) { |
| relationships = new ArrayList<>(); |
| } |
| |
| if (relationships.isEmpty()) { |
| // ensure relationships have been specified for processors, otherwise the anonymous relationship is used |
| if (source.getConnectableType() == ConnectableType.PROCESSOR) { |
| throw new IllegalStateException("Cannot build a Connection without any relationships"); |
| } |
| relationships.add(Relationship.ANONYMOUS); |
| } |
| |
| return new StandardConnection(this); |
| } |
| } |
| |
| @Override |
| public void verifyCanUpdate() { |
| // StandardConnection can always be updated |
| } |
| |
| @Override |
| public void verifyCanDelete() { |
| if (!flowFileQueue.isEmpty()) { |
| throw new IllegalStateException("Queue not empty for " + this.getIdentifier()); |
| } |
| |
| if (source.isRunning()) { |
| if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) { |
| throw new IllegalStateException("Source of Connection (" + source.getIdentifier() + ") is running"); |
| } |
| } |
| |
| final Connectable dest = destination.get(); |
| if (dest.isRunning()) { |
| if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) { |
| throw new IllegalStateException("Destination of Connection (" + dest.getIdentifier() + ") is running"); |
| } |
| } |
| } |
| |
| @Override |
| public Optional<String> getVersionedComponentId() { |
| return Optional.ofNullable(versionedComponentId.get()); |
| } |
| |
| @Override |
| public void setVersionedComponentId(final String versionedComponentId) { |
| boolean updated = false; |
| while (!updated) { |
| final String currentId = this.versionedComponentId.get(); |
| |
| if (currentId == null) { |
| updated = this.versionedComponentId.compareAndSet(null, versionedComponentId); |
| } else if (currentId.equals(versionedComponentId)) { |
| return; |
| } else if (versionedComponentId == null) { |
| updated = this.versionedComponentId.compareAndSet(currentId, null); |
| } else { |
| throw new IllegalStateException(this + " is already under version control"); |
| } |
| } |
| } |
| } |