blob: f4497b1da2df6bb155df7c59b3e24a7ff7cb99e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.dao.PortDAO;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public abstract class AbstractPortDAO extends ComponentDAO implements PortDAO {
protected FlowController flowController;
protected abstract Port locatePort(final String portId);
@Override
public void verifyUpdate(PortDTO portDTO) {
final Port port = locatePort(portDTO.getId());
verifyUpdate(port, portDTO);
}
protected void verifyUpdate(final Port port, final PortDTO portDTO) {
if (isNotNull(portDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(port.getScheduledState())) {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
port.verifyCanStart();
break;
case STOPPED:
switch (port.getScheduledState()) {
case RUNNING:
port.verifyCanStop();
break;
case DISABLED:
port.verifyCanEnable();
break;
}
break;
case DISABLED:
port.verifyCanDisable();
break;
}
}
}
// see what's be modified
if (isAnyNotNull(portDTO.getUserAccessControl(),
portDTO.getGroupAccessControl(),
portDTO.getConcurrentlySchedulableTaskCount(),
portDTO.getName(),
portDTO.getComments(),
portDTO.getAllowRemoteAccess())) {
// validate the request
final List<String> requestValidation = validateProposedConfiguration(port, portDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
// ensure the port can be updated
port.verifyCanUpdate();
}
}
private List<String> validateProposedConfiguration(final Port port, final PortDTO portDTO) {
List<String> validationErrors = new ArrayList<>();
if (isNotNull(portDTO.getName()) && portDTO.getName().trim().isEmpty()) {
validationErrors.add("The name of the port must be specified.");
}
if (isNotNull(portDTO.getConcurrentlySchedulableTaskCount()) && portDTO.getConcurrentlySchedulableTaskCount() <= 0) {
validationErrors.add("Concurrent tasks must be a positive integer.");
}
// Although StandardProcessGroup.addIn/OutputPort has the similar validation,
// this validation is necessary to prevent a port becomes public with an existing port name.
if (port instanceof PublicPort) {
final String portName = isNotNull(portDTO.getName()) ? portDTO.getName() : port.getName();
// If there is any port with the same name, but different identifier, throw an error.
if (getPublicPorts().stream()
.anyMatch(p -> portName.equals(p.getName()) && !port.getIdentifier().equals(p.getIdentifier()))) {
throw new IllegalStateException("Public port name must be unique throughout the flow.");
}
}
return validationErrors;
}
@Override
public void verifyPublicPortUniqueness(final String portId, final String portName) {
for (Port port : getPublicPorts()) {
if (portId.equals(port.getIdentifier())) {
throw new IllegalStateException("Public port identifier must be unique throughout the flow.");
} else if(portName.equals(port.getName())) {
throw new IllegalStateException("Public port name must be unique throughout the flow.");
}
}
}
protected abstract Set<Port> getPublicPorts();
protected abstract void handleStateTransition(final Port port, final ScheduledState proposedScheduledState) throws IllegalStateException;
@Override
public Port updatePort(PortDTO portDTO) {
final Port port = locatePort(portDTO.getId());
final ProcessGroup processGroup = port.getProcessGroup();
// ensure we can do this update
verifyUpdate(port, portDTO);
// handle state transition
if (isNotNull(portDTO.getState())) {
final ScheduledState proposedScheduledState = ScheduledState.valueOf(portDTO.getState());
// only attempt an action if it is changing
if (!proposedScheduledState.equals(port.getScheduledState())) {
try {
handleStateTransition(port, proposedScheduledState);
} catch (IllegalStateException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
}
}
}
if (port instanceof PublicPort) {
final PublicPort publicPort = (PublicPort) port;
if (isNotNull(portDTO.getGroupAccessControl())) {
publicPort.setGroupAccessControl(portDTO.getGroupAccessControl());
}
if (isNotNull(portDTO.getUserAccessControl())) {
publicPort.setUserAccessControl(portDTO.getUserAccessControl());
}
}
// update the port
final String name = portDTO.getName();
final String comments = portDTO.getComments();
final Integer concurrentTasks = portDTO.getConcurrentlySchedulableTaskCount();
if (isNotNull(portDTO.getPosition())) {
port.setPosition(new Position(portDTO.getPosition().getX(), portDTO.getPosition().getY()));
}
if (isNotNull(name)) {
port.setName(name);
}
if (isNotNull(comments)) {
port.setComments(comments);
}
if (isNotNull(concurrentTasks)) {
port.setMaxConcurrentTasks(concurrentTasks);
}
processGroup.onComponentModified();
return port;
}
@Override
public void verifyDelete(final String portId) {
final Port inputPort = locatePort(portId);
inputPort.verifyCanDelete();
}
/* setters */
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
}