blob: 274b5e45b625114993a7ac2ff13db4fbc4090fed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import static org.apache.nifi.util.StringUtils.isEmpty;
public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {
private FlowController flowController;
private ComponentStateDAO componentStateDAO;
private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final RemoteProcessGroup remoteProcessGroup = rootGroup.findRemoteProcessGroup(remoteProcessGroupId);
if (remoteProcessGroup == null) {
throw new ResourceNotFoundException(String.format("Unable to find remote process group with id '%s'.", remoteProcessGroupId));
} else {
return remoteProcessGroup;
}
}
@Override
public boolean hasRemoteProcessGroup(String remoteProcessGroupId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
return rootGroup.findRemoteProcessGroup(remoteProcessGroupId) != null;
}
/**
* Creates a remote process group reference.
*
* @param remoteProcessGroupDTO The remote process group
* @return The remote process group
*/
@Override
public RemoteProcessGroup createRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
ProcessGroup group = locateProcessGroup(flowController, groupId);
if (remoteProcessGroupDTO.getParentGroupId() != null && !flowController.getFlowManager().areGroupsSame(groupId, remoteProcessGroupDTO.getParentGroupId())) {
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Remote Process Group is being added.");
}
final String targetUris = remoteProcessGroupDTO.getTargetUris();
if (targetUris == null || targetUris.length() == 0) {
throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI(s)");
}
// create the remote process group
RemoteProcessGroup remoteProcessGroup = flowController.getFlowManager().createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
remoteProcessGroup.initialize();
// set other properties
updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
// get the group to add the remote process group to
group.addRemoteProcessGroup(remoteProcessGroup);
return remoteProcessGroup;
}
/**
* Gets the specified remote process group.
*
* @param remoteProcessGroupId The remote process group id
* @return The remote process group
*/
@Override
public RemoteProcessGroup getRemoteProcessGroup(String remoteProcessGroupId) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
return remoteProcessGroup;
}
/**
* Gets all of the remote process groups.
*
* @return The remote process groups
*/
@Override
public Set<RemoteProcessGroup> getRemoteProcessGroups(String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final Set<RemoteProcessGroup> remoteProcessGroups = group.getRemoteProcessGroups();
return remoteProcessGroups;
}
@Override
public void verifyUpdate(RemoteProcessGroupDTO remoteProcessGroup) {
verifyUpdate(locateRemoteProcessGroup(remoteProcessGroup.getId()), remoteProcessGroup);
}
/**
* Verifies the specified remote group can be updated, if necessary.
*/
@SuppressWarnings("unchecked")
private void verifyUpdate(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDto) {
// see if the remote process group can start/stop transmitting
if (isNotNull(remoteProcessGroupDto.isTransmitting())) {
if (!remoteProcessGroup.isTransmitting() && remoteProcessGroupDto.isTransmitting()) {
remoteProcessGroup.verifyCanStartTransmitting();
} else if (remoteProcessGroup.isTransmitting() && !remoteProcessGroupDto.isTransmitting()) {
remoteProcessGroup.verifyCanStopTransmitting();
}
}
// validate the proposed configuration
final List<String> requestValidation = validateProposedRemoteProcessGroupConfiguration(remoteProcessGroupDto);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
// if any remote group properties are changing, verify update
if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(),
remoteProcessGroupDto.getLocalNetworkInterface(),
remoteProcessGroupDto.getCommunicationsTimeout(),
remoteProcessGroupDto.getProxyHost(),
remoteProcessGroupDto.getProxyPort(),
remoteProcessGroupDto.getProxyUser(),
remoteProcessGroupDto.getProxyPassword())) {
remoteProcessGroup.verifyCanUpdate();
}
}
@Override
public void verifyUpdateInputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
final RemoteGroupPort port = remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId());
if (port == null) {
throw new ResourceNotFoundException(
String.format("Unable to find remote process group input port with id '%s'.", remoteProcessGroupPortDto.getId()));
}
verifyUpdatePort(port, remoteProcessGroupPortDto);
}
@Override
public void verifyUpdateOutputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
final RemoteGroupPort port = remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId());
if (port == null) {
throw new ResourceNotFoundException(
String.format("Unable to find remote process group output port with id '%s'.", remoteProcessGroupPortDto.getId()));
}
verifyUpdatePort(port, remoteProcessGroupPortDto);
}
/**
* Verified the specified remote port can be updated, if necessary.
*/
private void verifyUpdatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
// see if the remote process group can start/stop transmitting
if (isNotNull(remoteProcessGroupPortDto.isTransmitting())) {
if (!port.isRunning() && remoteProcessGroupPortDto.isTransmitting()) {
port.verifyCanStart();
} else if (port.isRunning() && !remoteProcessGroupPortDto.isTransmitting()) {
port.verifyCanStop();
}
}
// validate the proposed configuration
final List<String> requestValidation = validateProposedRemoteProcessGroupPortConfiguration(port, remoteProcessGroupPortDto);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
// verify update when appropriate
if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(),
remoteProcessGroupPortDto.getUseCompression(),
remoteProcessGroupPortDto.getBatchSettings())) {
port.verifyCanUpdate();
}
}
/**
* Validates the proposed configuration for the specified remote port.
*/
private List<String> validateProposedRemoteProcessGroupPortConfiguration(RemoteGroupPort remoteGroupPort, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
final List<String> validationErrors = new ArrayList<>();
// ensure the proposed port configuration is valid
if (isNotNull(remoteProcessGroupPortDTO.getConcurrentlySchedulableTaskCount()) && remoteProcessGroupPortDTO.getConcurrentlySchedulableTaskCount() <= 0) {
validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName()));
}
final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDTO.getBatchSettings();
if (batchSettingsDTO != null) {
final Integer batchCount = batchSettingsDTO.getCount();
if (isNotNull(batchCount) && batchCount < 0) {
validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName()));
}
final String batchSize = batchSettingsDTO.getSize();
if (isNotNull(batchSize) && batchSize.length() > 0
&& !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) {
validationErrors.add(String.format("Batch size for port '%s' must be of format <Data Size> <Data Unit>" +
" where <Data Size> is a non-negative integer and <Data Unit> is a supported Data"
+ " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName()));
}
final String batchDuration = batchSettingsDTO.getDuration();
if (isNotNull(batchDuration) && batchDuration.length() > 0
&& !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) {
validationErrors.add(String.format("Batch duration for port '%s' must be of format <duration> <TimeUnit>" +
" where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such "
+ "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName()));
}
}
return validationErrors;
}
/**
* Validates the proposed configuration for the specified remote group.
*/
private List<String> validateProposedRemoteProcessGroupConfiguration(RemoteProcessGroupDTO remoteProcessGroupDTO) {
final List<String> validationErrors = new ArrayList<>();
if (isNotNull(remoteProcessGroupDTO.getCommunicationsTimeout())) {
Matcher yieldMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(remoteProcessGroupDTO.getCommunicationsTimeout());
if (!yieldMatcher.matches()) {
validationErrors.add("Communications timeout is not a valid time duration (ie 30 sec, 5 min)");
}
}
if (isNotNull(remoteProcessGroupDTO.getYieldDuration())) {
Matcher yieldMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(remoteProcessGroupDTO.getYieldDuration());
if (!yieldMatcher.matches()) {
validationErrors.add("Yield duration is not a valid time duration (ie 30 sec, 5 min)");
}
}
String proxyPassword = remoteProcessGroupDTO.getProxyPassword();
String proxyUser = remoteProcessGroupDTO.getProxyUser();
String proxyHost = remoteProcessGroupDTO.getProxyHost();
if (isNotNull(remoteProcessGroupDTO.getProxyPort())) {
if (isEmpty(proxyHost)) {
validationErrors.add("Proxy port was specified, but proxy host was empty.");
}
}
if (!isEmpty(proxyUser)) {
if (isEmpty(proxyHost)) {
validationErrors.add("Proxy user name was specified, but proxy host was empty.");
}
if (isEmpty(proxyPassword)) {
validationErrors.add("User password should be specified if Proxy server needs user authentication.");
}
}
if (!isEmpty(proxyPassword)) {
if (isEmpty(proxyHost)) {
validationErrors.add("Proxy user password was specified, but proxy host was empty.");
}
if (isEmpty(proxyPassword)) {
validationErrors.add("User name should be specified if Proxy server needs user authentication.");
}
}
return validationErrors;
}
@Override
public RemoteGroupPort updateRemoteProcessGroupInputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
final RemoteGroupPort port = remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId());
if (port == null) {
throw new ResourceNotFoundException(
String.format("Unable to find remote process group input port with id '%s'.", remoteProcessGroupPortDto.getId()));
}
// verify the update
verifyUpdatePort(port, remoteProcessGroupPortDto);
// perform the update
updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
remoteProcessGroup.getProcessGroup().onComponentModified();
return port;
}
@Override
public RemoteGroupPort updateRemoteProcessGroupOutputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
final RemoteGroupPort port = remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId());
if (port == null) {
throw new ResourceNotFoundException(
String.format("Unable to find remote process group output port with id '%s'.", remoteProcessGroupId));
}
// verify the update
verifyUpdatePort(port, remoteProcessGroupPortDto);
// perform the update
updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
remoteProcessGroup.getProcessGroup().onComponentModified();
return port;
}
/**
*
* @param port Port instance to be updated.
* @param remoteProcessGroupPortDto DTO containing updated remote process group port settings.
* @param remoteProcessGroup If remoteProcessGroupPortDto has updated isTransmitting input,
* this method will start or stop the port in this remoteProcessGroup as necessary.
*/
private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) {
if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) {
port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount());
}
if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) {
port.setUseCompression(remoteProcessGroupPortDto.getUseCompression());
}
final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDto.getBatchSettings();
if (isNotNull(batchSettingsDTO)) {
port.setBatchCount(batchSettingsDTO.getCount());
port.setBatchSize(batchSettingsDTO.getSize());
port.setBatchDuration(batchSettingsDTO.getDuration());
}
final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting();
if (isNotNull(isTransmitting)) {
// start or stop as necessary
if (!port.isRunning() && isTransmitting) {
remoteProcessGroup.startTransmitting(port);
} else if (port.isRunning() && !isTransmitting) {
remoteProcessGroup.stopTransmitting(port);
}
}
}
@Override
public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) {
RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupDTO.getId());
return updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
}
private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) {
// verify the update request
verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO);
// configure the remote process group
final String targetUris = remoteProcessGroupDTO.getTargetUris();
final String name = remoteProcessGroupDTO.getName();
final String comments = remoteProcessGroupDTO.getComments();
final String communicationsTimeout = remoteProcessGroupDTO.getCommunicationsTimeout();
final String yieldDuration = remoteProcessGroupDTO.getYieldDuration();
final String proxyHost = remoteProcessGroupDTO.getProxyHost();
final Integer proxyPort = remoteProcessGroupDTO.getProxyPort();
final String proxyUser = remoteProcessGroupDTO.getProxyUser();
final String proxyPassword = remoteProcessGroupDTO.getProxyPassword();
final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol();
final String localNetworkInterface = remoteProcessGroupDTO.getLocalNetworkInterface();
if (isNotNull(targetUris)) {
remoteProcessGroup.setTargetUris(targetUris);
}
if (isNotNull(name)) {
remoteProcessGroup.setName(name);
}
if (isNotNull(comments)) {
remoteProcessGroup.setComments(comments);
}
if (isNotNull(communicationsTimeout)) {
remoteProcessGroup.setCommunicationsTimeout(communicationsTimeout);
}
if (isNotNull(yieldDuration)) {
remoteProcessGroup.setYieldDuration(yieldDuration);
}
if (isNotNull(remoteProcessGroupDTO.getPosition())) {
remoteProcessGroup.setPosition(new Position(remoteProcessGroupDTO.getPosition().getX(), remoteProcessGroupDTO.getPosition().getY()));
}
if (isNotNull(transportProtocol)) {
remoteProcessGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase()));
// No null check because these proxy settings have to be clear if not specified.
// But when user Enable/Disable transmission, only isTransmitting is sent.
// To prevent clearing these values in that case, set these only if transportProtocol is sent,
// assuming UI sends transportProtocol always for update.
remoteProcessGroup.setProxyHost(proxyHost);
remoteProcessGroup.setProxyPort(proxyPort);
remoteProcessGroup.setProxyUser(proxyUser);
// Keep using current password when null or "********" was sent.
// Passing other values updates the password,
// specify empty String to clear password.
if (isNotNull(proxyPassword) && !DtoFactory.SENSITIVE_VALUE_MASK.equals(proxyPassword)) {
remoteProcessGroup.setProxyPassword(proxyPassword);
}
}
if (localNetworkInterface != null) {
if (StringUtils.isBlank(localNetworkInterface)) {
remoteProcessGroup.setNetworkInterface(null);
} else {
remoteProcessGroup.setNetworkInterface(localNetworkInterface);
}
}
final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
if (isNotNull(isTransmitting)) {
// start or stop as necessary
if (!remoteProcessGroup.isTransmitting() && isTransmitting) {
remoteProcessGroup.startTransmitting();
} else if (remoteProcessGroup.isTransmitting() && !isTransmitting) {
remoteProcessGroup.stopTransmitting();
}
}
final ProcessGroup group = remoteProcessGroup.getProcessGroup();
if (group != null) {
group.onComponentModified();
}
return remoteProcessGroup;
}
@Override
public void verifyDelete(String remoteProcessGroupId) {
RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
remoteProcessGroup.verifyCanDelete();
}
@Override
public void deleteRemoteProcessGroup(String remoteProcessGroupId) {
RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup);
}
@Override
public StateMap getState(String remoteProcessGroupId, Scope scope) {
final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
return componentStateDAO.getState(remoteProcessGroup, scope);
}
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
this.componentStateDAO = componentStateDAO;
}
}