blob: 1cf78b641b2dbe07d067c178a3a6754fcfbebbce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.connectable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.scheduling.SchedulingStrategy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Models a connection between connectable components. A connection may contain
* one or more relationships that map the source component to the destination
* component.
*/
public final class StandardConnection implements Connection, ConnectionEventListener {
private final String id;
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicReference<String> name;
private final AtomicReference<List<Position>> bendPoints;
private final Connectable source;
private final AtomicReference<Connectable> destination;
private final AtomicReference<Collection<Relationship>> relationships;
private final AtomicInteger labelIndex = new AtomicInteger(1);
private final AtomicLong zIndex = new AtomicLong(0L);
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final ProcessScheduler scheduler;
private final int hashCode;
private volatile FlowFileQueue flowFileQueue;
private StandardConnection(final Builder builder) {
id = builder.id;
name = new AtomicReference<>(builder.name);
bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints)));
processGroup = new AtomicReference<>(builder.processGroup);
source = builder.source;
destination = new AtomicReference<>(builder.destination);
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this, processGroup.get());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@Override
public ProcessGroup getProcessGroup() {
return processGroup.get();
}
@Override
public String getIdentifier() {
return id;
}
@Override
public String getName() {
return name.get();
}
@Override
public void setName(final String name) {
this.name.set(name);
}
@Override
public Authorizable getParentAuthorizable() {
return getProcessGroup();
}
@Override
public Resource getResource() {
return new Resource() {
@Override
public String getIdentifier() {
return "/connections/" + StandardConnection.this.getIdentifier();
}
@Override
public String getName() {
String name = StandardConnection.this.getName();
final Collection<Relationship> relationships = getRelationships();
if (name == null && relationships != null && !relationships.isEmpty()) {
name = StringUtils.join(relationships.stream().map(Relationship::getName).collect(Collectors.toSet()), ", ");
}
if (name == null) {
name = "Connection";
}
return name;
}
@Override
public String getSafeDescription() {
return "Connection " + StandardConnection.this.getIdentifier();
}
};
}
@Override
public void triggerDestinationEvent() {
if (getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
scheduler.registerEvent(getDestination());
}
}
@Override
public void triggerSourceEvent() {
if (getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
scheduler.registerEvent(getSource());
}
}
@Override
public Authorizable getSourceAuthorizable() {
final Connectable sourceConnectable = getSource();
final Authorizable sourceAuthorizable;
// if the source is a remote group port, authorize according to the RPG
if (sourceConnectable instanceof RemoteGroupPort) {
sourceAuthorizable = ((RemoteGroupPort) sourceConnectable).getRemoteProcessGroup();
} else {
sourceAuthorizable = sourceConnectable;
}
return sourceAuthorizable;
}
@Override
public Authorizable getDestinationAuthorizable() {
final Connectable destinationConnectable = getDestination();
final Authorizable destinationAuthorizable;
// if the destination is a remote group port, authorize according to the RPG
if (destinationConnectable instanceof RemoteGroupPort) {
destinationAuthorizable = ((RemoteGroupPort) destinationConnectable).getRemoteProcessGroup();
} else {
destinationAuthorizable = destinationConnectable;
}
return destinationAuthorizable;
}
@Override
public AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) {
if (user == null) {
return AuthorizationResult.denied("Unknown user.");
}
// check the source
final AuthorizationResult sourceResult = getSourceAuthorizable().checkAuthorization(authorizer, action, user, resourceContext);
if (Result.Denied.equals(sourceResult.getResult())) {
return sourceResult;
}
// check the destination
return getDestinationAuthorizable().checkAuthorization(authorizer, action, user, resourceContext);
}
@Override
public void authorize(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) throws AccessDeniedException {
if (user == null) {
throw new AccessDeniedException("Unknown user.");
}
getSourceAuthorizable().authorize(authorizer, action, user, resourceContext);
getDestinationAuthorizable().authorize(authorizer, action, user, resourceContext);
}
@Override
public List<Position> getBendPoints() {
return bendPoints.get();
}
@Override
public void setBendPoints(final List<Position> position) {
this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position)));
}
@Override
public int getLabelIndex() {
return labelIndex.get();
}
@Override
public void setLabelIndex(final int labelIndex) {
this.labelIndex.set(labelIndex);
}
@Override
public long getZIndex() {
return zIndex.get();
}
@Override
public void setZIndex(final long zIndex) {
this.zIndex.set(zIndex);
}
@Override
public Connectable getSource() {
return source;
}
@Override
public Connectable getDestination() {
return destination.get();
}
@Override
public Collection<Relationship> getRelationships() {
return relationships.get();
}
@Override
public FlowFileQueue getFlowFileQueue() {
return flowFileQueue;
}
@Override
public void setProcessGroup(final ProcessGroup newGroup) {
final ProcessGroup currentGroup = this.processGroup.get();
try {
this.processGroup.set(newGroup);
} catch (final RuntimeException e) {
this.processGroup.set(currentGroup);
throw e;
}
}
@Override
public void setRelationships(final Collection<Relationship> newRelationships) {
final Collection<Relationship> currentRelationships = relationships.get();
if (currentRelationships.equals(newRelationships)) {
return;
}
try {
getSource().verifyCanUpdate();
} catch (final IllegalStateException ise) {
throw new IllegalStateException("Cannot update the relationships for Connection", ise);
}
try {
this.relationships.set(new ArrayList<>(newRelationships));
getSource().updateConnection(this);
} catch (final RuntimeException e) {
this.relationships.set(currentRelationships);
throw e;
}
}
@Override
public void setDestination(final Connectable newDestination) {
final Connectable previousDestination = destination.get();
if (previousDestination.equals(newDestination)) {
return;
}
if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) {
throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
}
if (getFlowFileQueue().isUnacknowledgedFlowFile()) {
throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
}
if (newDestination instanceof Funnel && newDestination.equals(source)) {
throw new IllegalStateException("Funnels do not support self-looping connections.");
}
try {
previousDestination.removeConnection(this);
this.destination.set(newDestination);
getSource().updateConnection(this);
newDestination.addConnection(this);
scheduler.registerEvent(newDestination);
} catch (final RuntimeException e) {
this.destination.set(previousDestination);
throw e;
}
}
@Override
public void lock() {
flowFileQueue.lock();
}
@Override
public void unlock() {
flowFileQueue.unlock();
}
@Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
return flowFileQueue.poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
return flowFileQueue.poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public boolean equals(final Object other) {
if (!(other instanceof Connection)) {
return false;
}
final Connection con = (Connection) other;
return new EqualsBuilder().append(id, con.getIdentifier()).isEquals();
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public String toString() {
return "Connection[ID=" + getIdentifier() + ", Source ID=" + getSource().getIdentifier() + ", Dest ID=" + getDestination().getIdentifier() + "]";
}
/**
* Gives this Connection ownership of the given FlowFile and allows the
* Connection to hold on to the FlowFile but NOT provide the FlowFile to
* consumers. This allows us to ensure that the Connection is not deleted
* during the middle of a Session commit.
*
* @param flowFile to add
*/
@Override
public void enqueue(final FlowFileRecord flowFile) {
flowFileQueue.put(flowFile);
}
@Override
public void enqueue(final Collection<FlowFileRecord> flowFiles) {
flowFileQueue.putAll(flowFiles);
}
public static class Builder {
private final ProcessScheduler scheduler;
private String id = UUID.randomUUID().toString();
private String name;
private List<Position> bendPoints = new ArrayList<>();
private ProcessGroup processGroup;
private Connectable source;
private Connectable destination;
private Collection<Relationship> relationships;
private FlowFileQueueFactory flowFileQueueFactory;
private boolean clustered = false;
public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
}
public Builder id(final String id) {
this.id = id;
return this;
}
public Builder source(final Connectable source) {
this.source = source;
return this;
}
public Builder processGroup(final ProcessGroup group) {
this.processGroup = group;
return this;
}
public Builder destination(final Connectable destination) {
this.destination = destination;
return this;
}
public Builder relationships(final Collection<Relationship> relationships) {
this.relationships = new ArrayList<>(relationships);
return this;
}
public Builder name(final String name) {
this.name = name;
return this;
}
public Builder bendPoints(final List<Position> bendPoints) {
this.bendPoints.clear();
this.bendPoints.addAll(bendPoints);
return this;
}
public Builder addBendPoint(final Position bendPoint) {
bendPoints.add(bendPoint);
return this;
}
public Builder flowFileQueueFactory(final FlowFileQueueFactory flowFileQueueFactory) {
this.flowFileQueueFactory = flowFileQueueFactory;
return this;
}
public Builder clustered(final boolean clustered) {
this.clustered = clustered;
return this;
}
public StandardConnection build() {
if (processGroup == null) {
throw new IllegalStateException("Cannot build a Connection without a Process Group");
}
if (source == null) {
throw new IllegalStateException("Cannot build a Connection without a Source");
}
if (destination == null) {
throw new IllegalStateException("Cannot build a Connection without a Destination");
}
if (flowFileQueueFactory == null) {
throw new IllegalStateException("Cannot build a Connection without a FlowFileQueueFactory");
}
if (relationships == null) {
relationships = new ArrayList<>();
}
if (relationships.isEmpty()) {
// ensure relationships have been specified for processors, otherwise the anonymous relationship is used
if (source.getConnectableType() == ConnectableType.PROCESSOR) {
throw new IllegalStateException("Cannot build a Connection without any relationships");
}
relationships.add(Relationship.ANONYMOUS);
}
return new StandardConnection(this);
}
}
@Override
public void verifyCanUpdate() {
// StandardConnection can always be updated
}
@Override
public void verifyCanDelete() {
if (!flowFileQueue.isEmpty()) {
throw new IllegalStateException("Queue not empty for " + this.getIdentifier());
}
if (source.isRunning()) {
if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
throw new IllegalStateException("Source of Connection (" + source.getIdentifier() + ") is running");
}
}
final Connectable dest = destination.get();
if (dest.isRunning()) {
if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) {
throw new IllegalStateException("Destination of Connection (" + dest.getIdentifier() + ") is running");
}
}
}
@Override
public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());
}
@Override
public void setVersionedComponentId(final String versionedComponentId) {
boolean updated = false;
while (!updated) {
final String currentId = this.versionedComponentId.get();
if (currentId == null) {
updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
} else if (currentId.equals(versionedComponentId)) {
return;
} else if (versionedComponentId == null) {
updated = this.versionedComponentId.compareAndSet(currentId, null);
} else {
throw new IllegalStateException(this + " is already under version control");
}
}
}
}