blob: 082da1cb0f47b8f5e799556dd1870699ba1b979f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.admin.service.AccountNotFoundException;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.Authority;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.user.AccountStatus;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.user.NiFiUserGroup;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.UserDTO;
import org.apache.nifi.web.api.dto.UserGroupDTO;
import org.apache.nifi.web.api.dto.action.ActionDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.NodePortStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ConnectionDAO;
import org.apache.nifi.web.dao.FunnelDAO;
import org.apache.nifi.web.dao.LabelDAO;
import org.apache.nifi.web.dao.PortDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.ProcessorDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.dao.SnippetDAO;
import org.apache.nifi.web.dao.TemplateDAO;
import org.apache.nifi.web.util.SnippetUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.AccessDeniedException;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
*/
public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
// nifi core components
private ControllerFacade controllerFacade;
private SnippetUtils snippetUtils;
// optimistic locking manager
private OptimisticLockingManager optimisticLockingManager;
// data access objects
private ProcessorDAO processorDAO;
private ProcessGroupDAO processGroupDAO;
private RemoteProcessGroupDAO remoteProcessGroupDAO;
private LabelDAO labelDAO;
private FunnelDAO funnelDAO;
private SnippetDAO snippetDAO;
private PortDAO inputPortDAO;
private PortDAO outputPortDAO;
private ConnectionDAO connectionDAO;
private ControllerServiceDAO controllerServiceDAO;
private ReportingTaskDAO reportingTaskDAO;
private TemplateDAO templateDAO;
// administrative services
private AuditService auditService;
private UserService userService;
// cluster manager
private WebClusterManager clusterManager;
// properties
private NiFiProperties properties;
private DtoFactory dtoFactory;
// -----------------------------------------
// Verification Operations
// -----------------------------------------
@Override
public void verifyCreateConnection(String groupId, ConnectionDTO connectionDTO) {
connectionDAO.verifyCreate(groupId, connectionDTO);
}
@Override
public void verifyUpdateConnection(String groupId, ConnectionDTO connectionDTO) {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (connectionDAO.hasConnection(groupId, connectionDTO.getId())) {
connectionDAO.verifyUpdate(groupId, connectionDTO);
} else {
connectionDAO.verifyCreate(groupId, connectionDTO);
}
}
@Override
public void verifyDeleteConnection(String groupId, String connectionId) {
connectionDAO.verifyDelete(groupId, connectionId);
}
@Override
public void verifyDeleteFunnel(String groupId, String funnelId) {
funnelDAO.verifyDelete(groupId, funnelId);
}
@Override
public void verifyUpdateInputPort(String groupId, PortDTO inputPortDTO) {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (inputPortDAO.hasPort(groupId, inputPortDTO.getId())) {
inputPortDAO.verifyUpdate(groupId, inputPortDTO);
}
}
@Override
public void verifyDeleteInputPort(String groupId, String inputPortId) {
inputPortDAO.verifyDelete(groupId, inputPortId);
}
@Override
public void verifyUpdateOutputPort(String groupId, PortDTO outputPortDTO) {
// if connection does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (outputPortDAO.hasPort(groupId, outputPortDTO.getId())) {
outputPortDAO.verifyUpdate(groupId, outputPortDTO);
}
}
@Override
public void verifyDeleteOutputPort(String groupId, String outputPortId) {
outputPortDAO.verifyDelete(groupId, outputPortId);
}
@Override
public void verifyUpdateProcessor(ProcessorDTO processorDTO) {
final String groupId = controllerFacade.findProcessGroupIdForProcessor(processorDTO.getId());
// if processor does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (groupId != null) {
verifyUpdateProcessor(groupId, processorDTO);
}
}
@Override
public void verifyUpdateProcessor(String groupId, ProcessorDTO processorDTO) {
// if processor does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (processorDAO.hasProcessor(groupId, processorDTO.getId())) {
processorDAO.verifyUpdate(groupId, processorDTO);
}
}
@Override
public void verifyDeleteProcessor(String groupId, String processorId) {
processorDAO.verifyDelete(groupId, processorId);
}
@Override
public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
// if group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
processGroupDAO.verifyUpdate(processGroupDTO);
}
}
@Override
public void verifyDeleteProcessGroup(String groupId) {
processGroupDAO.verifyDelete(groupId);
}
@Override
public void verifyUpdateRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if remote group does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId())) {
remoteProcessGroupDAO.verifyUpdate(groupId, remoteProcessGroupDTO);
}
}
@Override
public void verifyUpdateRemoteProcessGroupInputPort(String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
remoteProcessGroupDAO.verifyUpdateInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
}
@Override
public void verifyUpdateRemoteProcessGroupOutputPort(String groupId, String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
remoteProcessGroupDAO.verifyUpdateOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
}
@Override
public void verifyDeleteRemoteProcessGroup(String groupId, String remoteProcessGroupId) {
remoteProcessGroupDAO.verifyDelete(groupId, remoteProcessGroupId);
}
@Override
public void verifyUpdateControllerService(ControllerServiceDTO controllerServiceDTO) {
// if service does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
controllerServiceDAO.verifyUpdate(controllerServiceDTO);
}
}
@Override
public void verifyUpdateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState) {
controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
}
@Override
public void verifyDeleteControllerService(String controllerServiceId) {
controllerServiceDAO.verifyDelete(controllerServiceId);
}
@Override
public void verifyUpdateReportingTask(ReportingTaskDTO reportingTaskDTO) {
// if tasks does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
reportingTaskDAO.verifyUpdate(reportingTaskDTO);
}
}
@Override
public void verifyDeleteReportingTask(String reportingTaskId) {
reportingTaskDAO.verifyDelete(reportingTaskId);
}
// -----------------------------------------
// Write Operations
// -----------------------------------------
@Override
public ConfigurationSnapshot<ConnectionDTO> updateConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
// if connection does not exist, then create new connection
if (connectionDAO.hasConnection(groupId, connectionDTO.getId()) == false) {
return createConnection(revision, groupId, connectionDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
@Override
public ConnectionDTO execute() {
final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO);
controllerFacade.save();
return dtoFactory.createConnectionDto(connection);
}
});
}
@Override
public ConfigurationSnapshot<ProcessorDTO> updateProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
// if processor does not exist, then create new processor
if (processorDAO.hasProcessor(groupId, processorDTO.getId()) == false) {
return createProcessor(revision, groupId, processorDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
@Override
public ProcessorDTO execute() {
// update the processor
ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createProcessorDto(processor);
}
});
}
@Override
public ConfigurationSnapshot<LabelDTO> updateLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
// if label does not exist, then create new label
if (labelDAO.hasLabel(groupId, labelDTO.getId()) == false) {
return createLabel(revision, groupId, labelDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
@Override
public LabelDTO execute() {
// update the existing label
final Label label = labelDAO.updateLabel(groupId, labelDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createLabelDto(label);
}
});
}
@Override
public ConfigurationSnapshot<FunnelDTO> updateFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
// if label does not exist, then create new label
if (funnelDAO.hasFunnel(groupId, funnelDTO.getId()) == false) {
return createFunnel(revision, groupId, funnelDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
@Override
public FunnelDTO execute() {
// update the existing label
final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createFunnelDto(funnel);
}
});
}
@Override
public void verifyUpdateSnippet(SnippetDTO snippetDto) {
// if snippet does not exist, then the update request is likely creating it
// so we don't verify since it will fail
if (snippetDAO.hasSnippet(snippetDto.getId())) {
snippetDAO.verifyUpdate(snippetDto);
}
}
@Override
public ConfigurationSnapshot<SnippetDTO> updateSnippet(final Revision revision, final SnippetDTO snippetDto) {
// if label does not exist, then create new label
if (snippetDAO.hasSnippet(snippetDto.getId()) == false) {
return createSnippet(revision, snippetDto);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
@Override
public SnippetDTO execute() {
// update the snippet
final Snippet snippet = snippetDAO.updateSnippet(snippetDto);
// build the snippet dto
final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet);
responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false));
// save updated controller if applicable
if (snippetDto.getParentGroupId() != null && snippet.isLinked()) {
controllerFacade.save();
}
return responseSnippetDto;
}
});
}
@Override
public ConfigurationSnapshot<PortDTO> updateInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
// if input port does not exist, then create new input port
if (inputPortDAO.hasPort(groupId, inputPortDTO.getId()) == false) {
return createInputPort(revision, groupId, inputPortDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
@Override
public PortDTO execute() {
final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createPortDto(inputPort);
}
});
}
@Override
public ConfigurationSnapshot<PortDTO> updateOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
// if output port does not exist, then create new output port
if (outputPortDAO.hasPort(groupId, outputPortDTO.getId()) == false) {
return createOutputPort(revision, groupId, outputPortDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
@Override
public PortDTO execute() {
final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createPortDto(outputPort);
}
});
}
@Override
public ConfigurationSnapshot<RemoteProcessGroupDTO> updateRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if controller reference does not exist, then create new controller reference
if (remoteProcessGroupDAO.hasRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId()) == false) {
return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
@Override
public RemoteProcessGroupDTO execute() {
RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
}
});
}
@Override
public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
@Override
public RemoteProcessGroupPortDTO execute() {
// update the remote port
RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
}
});
}
@Override
public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort(final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupPortDTO>() {
@Override
public RemoteProcessGroupPortDTO execute() {
// update the remote port
RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
}
});
}
@Override
public ConfigurationSnapshot<ProcessGroupDTO> updateProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
// if process group does not exist, then create new process group
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId()) == false) {
if (parentGroupId == null) {
throw new IllegalArgumentException("Unable to create the specified process group since the parent group was not specified.");
} else {
return createProcessGroup(parentGroupId, revision, processGroupDTO);
}
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
@Override
public ProcessGroupDTO execute() {
// update the process group
ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO);
// save updated controller
controllerFacade.save();
return dtoFactory.createProcessGroupDto(processGroup);
}
});
}
@Override
public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerConfigurationDTO>() {
@Override
public ControllerConfigurationDTO execute() {
// update the controller configuration through the proxy
if (controllerConfigurationDTO.getName() != null) {
controllerFacade.setName(controllerConfigurationDTO.getName());
}
if (controllerConfigurationDTO.getComments() != null) {
controllerFacade.setComments(controllerConfigurationDTO.getComments());
}
if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
}
if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
}
// create the controller configuration dto
ControllerConfigurationDTO controllerConfig = getControllerConfiguration();
// save the flow
controllerFacade.save();
return controllerConfig;
}
});
}
@Override
public NodeDTO updateNode(NodeDTO nodeDTO) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
final String userDn = user.getDn();
if (Node.Status.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
clusterManager.requestReconnection(nodeDTO.getNodeId(), userDn);
} else if (Node.Status.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
clusterManager.requestDisconnection(nodeDTO.getNodeId(), userDn);
} else {
// handle primary
final Boolean primary = nodeDTO.isPrimary();
if (primary != null && primary) {
clusterManager.setPrimaryNode(nodeDTO.getNodeId(), userDn);
}
}
final String nodeId = nodeDTO.getNodeId();
return dtoFactory.createNodeDTO(clusterManager.getNode(nodeId), clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
}
@Override
public CounterDTO updateCounter(String counterId) {
return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
}
@Override
public ConfigurationSnapshot<Void> deleteConnection(final Revision revision, final String groupId, final String connectionId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>(){
@Override
public Void execute() {
connectionDAO.deleteConnection(groupId, connectionId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// delete the processor and synchronize the connection state
processorDAO.deleteProcessor(groupId, processorId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteLabel(final Revision revision, final String groupId, final String labelId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// delete the label
labelDAO.deleteLabel(groupId, labelId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteFunnel(final Revision revision, final String groupId, final String funnelId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// delete the label
funnelDAO.deleteFunnel(groupId, funnelId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public void verifyDeleteSnippet(String id) {
snippetDAO.verifyDelete(id);
}
@Override
public ConfigurationSnapshot<Void> deleteSnippet(final Revision revision, final String snippetId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// determine if this snippet was linked to the data flow
Snippet snippet = snippetDAO.getSnippet(snippetId);
boolean linked = snippet.isLinked();
// delete the snippet
snippetDAO.deleteSnippet(snippetId);
// save the flow if necessary
if (linked) {
controllerFacade.save();
}
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteInputPort(final Revision revision, final String groupId, final String inputPortId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
inputPortDAO.deletePort(groupId, inputPortId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteOutputPort(final Revision revision, final String groupId, final String outputPortId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
outputPortDAO.deletePort(groupId, outputPortId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteProcessGroup(final Revision revision, final String groupId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
processGroupDAO.deleteProcessGroup(groupId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteRemoteProcessGroup(final Revision revision, final String groupId, final String remoteProcessGroupId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId);
// save the flow
controllerFacade.save();
return null;
}
});
}
@Override
public void deleteTemplate(String id) {
// create the template
templateDAO.deleteTemplate(id);
}
@Override
public ConfigurationSnapshot<ConnectionDTO> createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ConnectionDTO>() {
@Override
public ConnectionDTO execute() {
// ensure id is set
if (StringUtils.isBlank(connectionDTO.getId())) {
connectionDTO.setId(UUID.randomUUID().toString());
}
final Connection connection = connectionDAO.createConnection(groupId, connectionDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createConnectionDto(connection);
}
});
}
@Override
public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
@Override
public ProcessorDTO execute() {
// ensure id is set
if (StringUtils.isBlank(processorDTO.getId())) {
processorDTO.setId(UUID.randomUUID().toString());
}
// create the processor
final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createProcessorDto(processor);
}
});
}
@Override
public ConfigurationSnapshot<LabelDTO> createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<LabelDTO>() {
@Override
public LabelDTO execute() {
// ensure id is set
if (StringUtils.isBlank(labelDTO.getId())) {
labelDTO.setId(UUID.randomUUID().toString());
}
// add the label
final Label label = labelDAO.createLabel(groupId, labelDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createLabelDto(label);
}
});
}
@Override
public ConfigurationSnapshot<FunnelDTO> createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FunnelDTO>() {
@Override
public FunnelDTO execute() {
// ensure id is set
if (StringUtils.isBlank(funnelDTO.getId())) {
funnelDTO.setId(UUID.randomUUID().toString());
}
// add the label
final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createFunnelDto(funnel);
}
});
}
private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) {
// validate any processors
if (flowSnippet.getProcessors() != null) {
for (final ProcessorDTO processorDTO : flowSnippet.getProcessors()) {
final ProcessorNode processorNode = processorDAO.getProcessor(groupId, processorDTO.getId());
final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors();
if (validationErrors != null && !validationErrors.isEmpty()) {
final List<String> errors = new ArrayList<>();
for (final ValidationResult validationResult : validationErrors) {
errors.add(validationResult.toString());
}
processorDTO.setValidationErrors(errors);
}
}
}
if (flowSnippet.getInputPorts() != null) {
for (final PortDTO portDTO : flowSnippet.getInputPorts()) {
final Port port = inputPortDAO.getPort(groupId, portDTO.getId());
final Collection<ValidationResult> validationErrors = port.getValidationErrors();
if (validationErrors != null && !validationErrors.isEmpty()) {
final List<String> errors = new ArrayList<>();
for (final ValidationResult validationResult : validationErrors) {
errors.add(validationResult.toString());
}
portDTO.setValidationErrors(errors);
}
}
}
if (flowSnippet.getOutputPorts() != null) {
for (final PortDTO portDTO : flowSnippet.getOutputPorts()) {
final Port port = outputPortDAO.getPort(groupId, portDTO.getId());
final Collection<ValidationResult> validationErrors = port.getValidationErrors();
if (validationErrors != null && !validationErrors.isEmpty()) {
final List<String> errors = new ArrayList<>();
for (final ValidationResult validationResult : validationErrors) {
errors.add(validationResult.toString());
}
portDTO.setValidationErrors(errors);
}
}
}
// get any remote process group issues
if (flowSnippet.getRemoteProcessGroups() != null) {
for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flowSnippet.getRemoteProcessGroups()) {
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(groupId, remoteProcessGroupDTO.getId());
if (remoteProcessGroup.getAuthorizationIssue() != null) {
remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue()));
}
}
}
}
@Override
public ConfigurationSnapshot<FlowSnippetDTO> copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
@Override
public FlowSnippetDTO execute() {
String id = snippetId;
// ensure id is set
if (StringUtils.isBlank(id)) {
id = UUID.randomUUID().toString();
}
// create the new snippet
FlowSnippetDTO flowSnippet = snippetDAO.copySnippet(groupId, id, originX, originY);
// validate the new snippet
validateSnippetContents(flowSnippet, groupId);
// save the flow
controllerFacade.save();
return flowSnippet;
}
});
}
@Override
public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() {
@Override
public SnippetDTO execute() {
// ensure id is set
if (StringUtils.isBlank(snippetDTO.getId())) {
snippetDTO.setId(UUID.randomUUID().toString());
}
// add the snippet
final Snippet snippet = snippetDAO.createSnippet(snippetDTO);
final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet);
responseSnippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
return responseSnippetDTO;
}
});
}
@Override
public ConfigurationSnapshot<PortDTO> createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
@Override
public PortDTO execute() {
// ensure id is set
if (StringUtils.isBlank(inputPortDTO.getId())) {
inputPortDTO.setId(UUID.randomUUID().toString());
}
final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createPortDto(inputPort);
}
});
}
@Override
public ConfigurationSnapshot<PortDTO> createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<PortDTO>() {
@Override
public PortDTO execute() {
// ensure id is set
if (StringUtils.isBlank(outputPortDTO.getId())) {
outputPortDTO.setId(UUID.randomUUID().toString());
}
final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createPortDto(outputPort);
}
});
}
@Override
public ConfigurationSnapshot<ProcessGroupDTO> createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessGroupDTO>() {
@Override
public ProcessGroupDTO execute() {
// ensure id is set
if (StringUtils.isBlank(processGroupDTO.getId())) {
processGroupDTO.setId(UUID.randomUUID().toString());
}
final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createProcessGroupDto(processGroup);
}
});
}
@Override
public ConfigurationSnapshot<RemoteProcessGroupDTO> createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<RemoteProcessGroupDTO>() {
@Override
public RemoteProcessGroupDTO execute() {
// ensure id is set
if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) {
remoteProcessGroupDTO.setId(UUID.randomUUID().toString());
}
final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup);
}
});
}
@Override
public TemplateDTO createTemplate(String name, String description, String snippetId) {
// get the specified snippet
Snippet snippet = snippetDAO.getSnippet(snippetId);
// create the template
TemplateDTO templateDTO = new TemplateDTO();
templateDTO.setName(name);
templateDTO.setDescription(description);
templateDTO.setTimestamp(new Date());
templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true));
// set the id based on the specified seed
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
}
// create the template
Template template = templateDAO.createTemplate(templateDTO);
return dtoFactory.createTemplateDTO(template);
}
@Override
public TemplateDTO importTemplate(TemplateDTO templateDTO) {
// ensure id is set
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
}
// mark the timestamp
templateDTO.setTimestamp(new Date());
// import the template
final Template template = templateDAO.importTemplate(templateDTO);
// return the template dto
return dtoFactory.createTemplateDTO(template);
}
@Override
public ConfigurationSnapshot<FlowSnippetDTO> createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowSnippetDTO>() {
@Override
public FlowSnippetDTO execute() {
// instantiate the template - there is no need to make another copy of the flow snippet since the actual template
// was copied and this dto is only used to instantiate it's components (which as already completed)
FlowSnippetDTO flowSnippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
// validate the new snippet
validateSnippetContents(flowSnippet, groupId);
// save the flow
controllerFacade.save();
return flowSnippet;
}
});
}
@Override
public ConfigurationSnapshot<Void> createArchive(final Revision revision) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// create the archive
controllerFacade.createArchive();
return null;
}
});
}
@Override
public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
@Override
public ProcessorDTO execute() {
// create the processor config
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setAnnotationData(annotationData);
// create the processor dto
final ProcessorDTO processorDTO = new ProcessorDTO();
processorDTO.setId(processorId);
processorDTO.setConfig(config);
// get the parent group id for the specified processor
String groupId = controllerFacade.findProcessGroupIdForProcessor(processorId);
// ensure the parent group id was found
if (groupId == null) {
throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", processorId));
}
// update the processor configuration
ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO);
// save the flow
controllerFacade.save();
return dtoFactory.createProcessorDto(processor);
}
});
}
@Override
public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
@Override
public ControllerServiceDTO execute() {
// ensure id is set
if (StringUtils.isBlank(controllerServiceDTO.getId())) {
controllerServiceDTO.setId(UUID.randomUUID().toString());
}
// create the controller service
final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveControllerServices();
} else {
controllerFacade.save();
}
return dtoFactory.createControllerServiceDto(controllerService);
}
});
}
@Override
public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
// if controller service does not exist, then create new controller service
if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) {
return createControllerService(revision, controllerServiceDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() {
@Override
public ControllerServiceDTO execute() {
final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveControllerServices();
} else {
controllerFacade.save();
}
return dtoFactory.createControllerServiceDto(controllerService);
}
});
}
@Override
public ConfigurationSnapshot<Set<ControllerServiceReferencingComponentDTO>> updateControllerServiceReferencingComponents(final Revision revision, final String controllerServiceId, final org.apache.nifi.controller.ScheduledState scheduledState, final org.apache.nifi.controller.service.ControllerServiceState controllerServiceState) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Set<ControllerServiceReferencingComponentDTO>>() {
@Override
public Set<ControllerServiceReferencingComponentDTO> execute() {
final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
return dtoFactory.createControllerServiceReferencingComponentsDto(reference);
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteControllerService(final Revision revision, final String controllerServiceId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// delete the label
controllerServiceDAO.deleteControllerService(controllerServiceId);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveControllerServices();
} else {
controllerFacade.save();
}
return null;
}
});
}
@Override
public ConfigurationSnapshot<ReportingTaskDTO> createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() {
@Override
public ReportingTaskDTO execute() {
// ensure id is set
if (StringUtils.isBlank(reportingTaskDTO.getId())) {
reportingTaskDTO.setId(UUID.randomUUID().toString());
}
// create the reporting
final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveReportingTasks();
} else {
controllerFacade.save();
}
return dtoFactory.createReportingTaskDto(reportingTask);
}
});
}
@Override
public ConfigurationSnapshot<ReportingTaskDTO> updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
// if reporting task does not exist, then create new reporting task
if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId()) == false) {
return createReportingTask(revision, reportingTaskDTO);
}
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() {
@Override
public ReportingTaskDTO execute() {
final ReportingTaskNode reportingTask = reportingTaskDAO.updateReportingTask(reportingTaskDTO);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveReportingTasks();
} else {
controllerFacade.save();
}
return dtoFactory.createReportingTaskDto(reportingTask);
}
});
}
@Override
public ConfigurationSnapshot<Void> deleteReportingTask(final Revision revision, final String reportingTaskId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
public Void execute() {
// delete the label
reportingTaskDAO.deleteReportingTask(reportingTaskId);
// save the update
if (properties.isClusterManager()) {
clusterManager.saveReportingTasks();
} else {
controllerFacade.save();
}
return null;
}
});
}
@Override
public void deleteActions(Date endDate) {
// get the user from the request
NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
// create the purge details
PurgeDetails details = new PurgeDetails();
details.setEndDate(endDate);
// create a purge action to record that records are being removed
Action purgeAction = new Action();
purgeAction.setUserDn(user.getDn());
purgeAction.setUserName(user.getUserName());
purgeAction.setOperation(Operation.Purge);
purgeAction.setTimestamp(new Date());
purgeAction.setSourceId("Flow Controller");
purgeAction.setSourceName("History");
purgeAction.setSourceType(Component.Controller);
purgeAction.setActionDetails(details);
// purge corresponding actions
auditService.purgeActions(endDate, purgeAction);
}
@Override
public void invalidateUser(String userId) {
try {
userService.invalidateUserAccount(userId);
} catch (final AccountNotFoundException anfe) {
// ignore
}
}
@Override
public void invalidateUserGroup(String userGroup, Set<String> userIds) {
// invalidates any user currently associated with this group
if (userGroup != null) {
userService.invalidateUserGroupAccount(userGroup);
}
// invalidates any user that will be associated with this group
if (userIds != null) {
for (final String userId : userIds) {
invalidateUser(userId);
}
}
}
@Override
public UserDTO updateUser(UserDTO userDto) {
NiFiUser user;
// attempt to parse the user id
final String id = userDto.getId();
// determine the authorities that have been specified in the request
Set<Authority> authorities = null;
if (userDto.getAuthorities() != null) {
authorities = Authority.convertRawAuthorities(userDto.getAuthorities());
}
// if the account status isn't specified or isn't changing
final AccountStatus accountStatus = AccountStatus.valueOfStatus(userDto.getStatus());
if (accountStatus == null || AccountStatus.ACTIVE.equals(accountStatus)) {
// ensure that authorities have been specified (may be empty, but not null)
if (authorities == null) {
throw new IllegalArgumentException("Authorities must be specified when updating an account.");
}
// update the user account
user = userService.update(id, authorities);
} else if (AccountStatus.DISABLED.equals(accountStatus)) {
// disable the account
user = userService.disable(id);
} else {
throw new IllegalArgumentException("Accounts cannot be marked pending.");
}
return dtoFactory.createUserDTO(user);
}
@Override
public void deleteUser(String userId) {
userService.deleteUser(userId);
}
@Override
public UserGroupDTO updateUserGroup(final UserGroupDTO userGroupDTO) {
NiFiUserGroup userGroup;
// convert the authorities
Set<Authority> authorities = null;
if (userGroupDTO.getAuthorities() != null) {
authorities = Authority.convertRawAuthorities(userGroupDTO.getAuthorities());
}
final AccountStatus accountStatus = AccountStatus.valueOfStatus(userGroupDTO.getStatus());
if (accountStatus == null || AccountStatus.ACTIVE.equals(accountStatus)) {
// update the user group
userGroup = userService.updateGroup(userGroupDTO.getGroup(), userGroupDTO.getUserIds(), authorities);
} else if (AccountStatus.DISABLED.equals(accountStatus)) {
// disable the accounts
userGroup = userService.disableGroup(userGroupDTO.getGroup());
} else {
throw new IllegalArgumentException("Accounts cannot be marked pending.");
}
// generate the user group dto
return dtoFactory.createUserGroupDTO(userGroup);
}
@Override
public void removeUserFromGroup(String userId) {
userService.ungroupUser(userId);
}
@Override
public void removeUserGroup(String userGroup) {
userService.ungroup(userGroup);
}
@Override
public ProvenanceDTO submitProvenance(ProvenanceDTO query) {
return controllerFacade.submitProvenance(query);
}
@Override
public void deleteProvenance(String queryId) {
controllerFacade.deleteProvenanceQuery(queryId);
}
@Override
public LineageDTO submitLineage(LineageDTO lineage) {
return controllerFacade.submitLineage(lineage);
}
@Override
public void deleteLineage(String lineageId) {
controllerFacade.deleteLineage(lineageId);
}
@Override
public ProvenanceEventDTO submitReplay(Long eventId) {
return controllerFacade.submitReplay(eventId);
}
// -----------------------------------------
// Read Operations
// -----------------------------------------
@Override
public RevisionDTO getRevision() {
return dtoFactory.createRevisionDTO(optimisticLockingManager.getLastModification());
}
@Override
public SearchResultsDTO searchController(String query) {
return controllerFacade.search(query);
}
@Override
public DownloadableContent getContent(Long eventId, String uri, ContentDirection contentDirection) {
return controllerFacade.getContent(eventId, uri, contentDirection);
}
@Override
public ProvenanceDTO getProvenance(String queryId) {
return controllerFacade.getProvenanceQuery(queryId);
}
@Override
public LineageDTO getLineage(String lineageId) {
return controllerFacade.getLineage(lineageId);
}
@Override
public ProvenanceOptionsDTO getProvenanceSearchOptions() {
return controllerFacade.getProvenanceSearchOptions();
}
@Override
public ProvenanceEventDTO getProvenanceEvent(final Long id) {
return controllerFacade.getProvenanceEvent(id);
}
@Override
public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) {
ProcessGroupStatusDTO statusReport;
if (properties.isClusterManager()) {
final ProcessGroupStatus mergedProcessGroupStatus = clusterManager.getProcessGroupStatus(groupId);
if (mergedProcessGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for process group %s.", groupId));
}
statusReport = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), mergedProcessGroupStatus);
} else {
statusReport = controllerFacade.getProcessGroupStatus(groupId);
}
return statusReport;
}
@Override
public ControllerStatusDTO getControllerStatus() {
final ControllerStatusDTO controllerStatus;
if (properties.isClusterManager()) {
final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED);
if (connectedNodes.isEmpty()) {
throw new NoConnectedNodesException();
}
int activeThreadCount = 0;
long totalFlowFileObjectCount = 0;
long totalFlowFileByteCount = 0;
for (final Node node : connectedNodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
activeThreadCount += nodeHeartbeatPayload.getActiveThreadCount();
totalFlowFileObjectCount += nodeHeartbeatPayload.getTotalFlowFileCount();
totalFlowFileByteCount += nodeHeartbeatPayload.getTotalFlowFileBytes();
}
controllerStatus = new ControllerStatusDTO();
controllerStatus.setActiveThreadCount(activeThreadCount);
controllerStatus.setQueued(FormatUtils.formatCount(totalFlowFileObjectCount) + " / " + FormatUtils.formatDataSize(totalFlowFileByteCount));
final int numNodes = clusterManager.getNodeIds().size();
controllerStatus.setConnectedNodes(connectedNodes.size() + " / " + numNodes);
// get the bulletins for the controller
final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository();
final List<Bulletin> results = bulletinRepository.findBulletinsForController();
final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
for (final Bulletin bulletin : results) {
bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
}
controllerStatus.setBulletins(bulletinDtos);
// get the component counts by extracting them from the roots' group status
final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root");
if (status != null) {
final ProcessGroupCounts counts = extractProcessGroupCounts(status);
controllerStatus.setRunningCount(counts.getRunningCount());
controllerStatus.setStoppedCount(counts.getStoppedCount());
controllerStatus.setInvalidCount(counts.getInvalidCount());
controllerStatus.setDisabledCount(counts.getDisabledCount());
controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount());
controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount());
}
} else {
// get the controller status
controllerStatus = controllerFacade.getControllerStatus();
}
// determine if there are any pending user accounts - only include if appropriate
if (NiFiUserUtils.getAuthorities().contains(Authority.ROLE_ADMIN.toString())) {
controllerStatus.setHasPendingAccounts(userService.hasPendingUserAccount());
}
return controllerStatus;
}
@Override
public CountersDTO getCounters() {
if (properties.isClusterManager()) {
final Map<String, CounterDTO> mergedCountersMap = new HashMap<>();
final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED);
if (connectedNodes.isEmpty()) {
throw new NoConnectedNodesException();
}
for (final Node node : connectedNodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final List<Counter> nodeCounters = node.getHeartbeatPayload().getCounters();
if (nodeCounters == null) {
continue;
}
// for each node, add its counter values to the aggregate values
for (final Counter nodeCounter : nodeCounters) {
final CounterDTO mergedCounter = mergedCountersMap.get(nodeCounter.getIdentifier());
// either create a new aggregate counter or update the aggregate counter
if (mergedCounter == null) {
// add new counter
mergedCountersMap.put(nodeCounter.getIdentifier(), dtoFactory.createCounterDto(nodeCounter));
} else {
// update aggregate counter
mergedCounter.setValueCount(mergedCounter.getValueCount() + nodeCounter.getValue());
mergedCounter.setValue(FormatUtils.formatCount(mergedCounter.getValueCount()));
}
}
}
final CountersDTO mergedCounters = new CountersDTO();
mergedCounters.setGenerated(new Date());
mergedCounters.setCounters(mergedCountersMap.values());
return mergedCounters;
} else {
List<Counter> counters = controllerFacade.getCounters();
Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
for (Counter counter : counters) {
counterDTOs.add(dtoFactory.createCounterDto(counter));
}
return dtoFactory.createCountersDto(counterDTOs);
}
}
@Override
public Set<ConnectionDTO> getConnections(String groupId) {
Set<ConnectionDTO> connectionDtos = new LinkedHashSet<>();
for (Connection connection : connectionDAO.getConnections(groupId)) {
connectionDtos.add(dtoFactory.createConnectionDto(connection));
}
return connectionDtos;
}
@Override
public ConnectionDTO getConnection(String groupId, String connectionId) {
return dtoFactory.createConnectionDto(connectionDAO.getConnection(groupId, connectionId));
}
@Override
public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
}
@Override
public Set<ProcessorDTO> getProcessors(String groupId) {
Set<ProcessorDTO> processorDtos = new LinkedHashSet<>();
for (ProcessorNode processor : processorDAO.getProcessors(groupId)) {
processorDtos.add(dtoFactory.createProcessorDto(processor));
}
return processorDtos;
}
@Override
public TemplateDTO exportTemplate(String id) {
Template template = templateDAO.getTemplate(id);
TemplateDTO templateDetails = template.getDetails();
TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template);
templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet()));
return templateDTO;
}
@Override
public TemplateDTO getTemplate(String id) {
return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id));
}
@Override
public Set<TemplateDTO> getTemplates() {
Set<TemplateDTO> templateDtos = new LinkedHashSet<>();
for (Template template : templateDAO.getTemplates()) {
templateDtos.add(dtoFactory.createTemplateDTO(template));
}
return templateDtos;
}
@Override
public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() {
return controllerFacade.getFlowFileComparatorTypes();
}
@Override
public Set<DocumentedTypeDTO> getProcessorTypes() {
return controllerFacade.getFlowFileProcessorTypes();
}
@Override
public Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType) {
return controllerFacade.getControllerServiceTypes(serviceType);
}
@Override
public Set<DocumentedTypeDTO> getReportingTaskTypes() {
return controllerFacade.getReportingTaskTypes();
}
@Override
public ProcessorDTO getProcessor(String groupId, String id) {
final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
final ProcessorDTO processorDto = dtoFactory.createProcessorDto(processor);
return processorDto;
}
@Override
public PropertyDescriptorDTO getProcessorPropertyDescriptor(String groupId, String id, String property) {
final ProcessorNode processor = processorDAO.getProcessor(groupId, id);
PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);
// return an invalid descriptor if the processor doesn't suppor this property
if (descriptor == null) {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor);
}
@Override
public StatusHistoryDTO getProcessorStatusHistory(String groupId, String id) {
return controllerFacade.getProcessorStatusHistory(groupId, id);
}
@Override
public BulletinBoardDTO getBulletinBoard(BulletinQueryDTO query) {
// build the query
final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
.groupIdMatches(query.getGroupId())
.sourceIdMatches(query.getSourceId())
.nameMatches(query.getName())
.messageMatches(query.getMessage())
.after(query.getAfter())
.limit(query.getLimit());
// get the bulletin repository
final BulletinRepository bulletinRepository;
if (properties.isClusterManager()) {
bulletinRepository = clusterManager.getBulletinRepository();
} else {
bulletinRepository = controllerFacade.getBulletinRepository();
}
// perform the query
final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());
// perform the query and generate the results - iterating in reverse order since we are
// getting the most recent results by ordering by timestamp desc above. this gets the
// exact results we want but in reverse order
final List<BulletinDTO> bulletins = new ArrayList<>();
for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious();) {
bulletins.add(dtoFactory.createBulletinDto(bulletinIter.previous()));
}
// create the bulletin board
BulletinBoardDTO bulletinBoard = new BulletinBoardDTO();
bulletinBoard.setBulletins(bulletins);
bulletinBoard.setGenerated(new Date());
return bulletinBoard;
}
@Override
public SystemDiagnosticsDTO getSystemDiagnostics() {
final SystemDiagnosticsDTO dto;
if (properties.isClusterManager()) {
final SystemDiagnostics clusterDiagnostics = clusterManager.getSystemDiagnostics();
if (clusterDiagnostics == null) {
throw new IllegalStateException("Nodes are connected but no systems diagnostics have been reported.");
}
dto = dtoFactory.createSystemDiagnosticsDto(clusterDiagnostics);
} else {
final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
dto = dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
}
return dto;
}
/**
* Ensures the specified user has permission to access the specified port.
*
* @param user
* @param port
* @return
*/
private boolean isUserAuthorized(final NiFiUser user, final RootGroupPort port) {
final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());
// if site to site is not secure, allow all users
if (!isSiteToSiteSecure) {
return true;
}
final Set<String> allowedUsers = port.getUserAccessControl();
if (allowedUsers.contains(user.getDn())) {
return true;
}
final String userGroup = user.getUserGroup();
if (userGroup == null) {
return false;
}
final Set<String> allowedGroups = port.getGroupAccessControl();
return allowedGroups.contains(userGroup);
}
@Override
public ControllerDTO getController() {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
// at this point we know that the user must have ROLE_NIFI because it's required
// get to the endpoint that calls this method but we'll check again anyways
final Set<Authority> authorities = user.getAuthorities();
if (!authorities.contains(Authority.ROLE_NIFI)) {
throw new AccessDeniedException("User must have the NiFi role in order to access these details.");
}
// serialize the input ports this NiFi has access to
final Set<PortDTO> inputPorts = new LinkedHashSet<>();
for (RootGroupPort inputPort : controllerFacade.getInputPorts()) {
if (isUserAuthorized(user, inputPort)) {
final PortDTO dto = new PortDTO();
dto.setId(inputPort.getIdentifier());
dto.setName(inputPort.getName());
dto.setComments(inputPort.getComments());
dto.setState(inputPort.getScheduledState().toString());
inputPorts.add(dto);
}
}
// serialize the output ports this NiFi has access to
final Set<PortDTO> outputPorts = new LinkedHashSet<>();
for (RootGroupPort outputPort : controllerFacade.getOutputPorts()) {
if (isUserAuthorized(user, outputPort)) {
final PortDTO dto = new PortDTO();
dto.setId(outputPort.getIdentifier());
dto.setName(outputPort.getName());
dto.setComments(outputPort.getComments());
dto.setState(outputPort.getScheduledState().toString());
outputPorts.add(dto);
}
}
// get the root group
final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
final ProcessGroupCounts counts = rootGroup.getCounts();
// create the controller dto
final ControllerDTO controllerDTO = new ControllerDTO();
controllerDTO.setId(controllerFacade.getRootGroupId());
controllerDTO.setInstanceId(controllerFacade.getInstanceId());
controllerDTO.setName(controllerFacade.getName());
controllerDTO.setComments(controllerFacade.getComments());
controllerDTO.setInputPorts(inputPorts);
controllerDTO.setOutputPorts(outputPorts);
controllerDTO.setInputPortCount(inputPorts.size());
controllerDTO.setOutputPortCount(outputPorts.size());
controllerDTO.setRunningCount(counts.getRunningCount());
controllerDTO.setStoppedCount(counts.getStoppedCount());
controllerDTO.setInvalidCount(counts.getInvalidCount());
controllerDTO.setDisabledCount(counts.getDisabledCount());
// determine the site to site configuration
if (isClustered()) {
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort());
controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
} else {
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
}
return controllerDTO;
}
@Override
public String getInstanceId() {
return controllerFacade.getInstanceId();
}
@Override
public ControllerConfigurationDTO getControllerConfiguration() {
ControllerConfigurationDTO controllerConfig = new ControllerConfigurationDTO();
controllerConfig.setName(controllerFacade.getName());
controllerConfig.setComments(controllerFacade.getComments());
controllerConfig.setMaxTimerDrivenThreadCount(controllerFacade.getMaxTimerDrivenThreadCount());
controllerConfig.setMaxEventDrivenThreadCount(controllerFacade.getMaxEventDrivenThreadCount());
// get the refresh interval
final long refreshInterval = FormatUtils.getTimeDuration(properties.getAutoRefreshInterval(), TimeUnit.SECONDS);
controllerConfig.setAutoRefreshIntervalSeconds(refreshInterval);
// get the content viewer url
controllerConfig.setContentViewerUrl(properties.getProperty(NiFiProperties.CONTENT_VIEWER_URL));
final Date now = new Date();
controllerConfig.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
controllerConfig.setCurrentTime(now);
// determine the site to site configuration
if (isClustered()) {
controllerConfig.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
} else {
controllerConfig.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
}
return controllerConfig;
}
@Override
public Set<LabelDTO> getLabels(String groupId) {
Set<LabelDTO> labelDtos = new LinkedHashSet<>();
for (Label label : labelDAO.getLabels(groupId)) {
labelDtos.add(dtoFactory.createLabelDto(label));
}
return labelDtos;
}
@Override
public LabelDTO getLabel(String groupId, String labelId) {
return dtoFactory.createLabelDto(labelDAO.getLabel(groupId, labelId));
}
@Override
public Set<FunnelDTO> getFunnels(String groupId) {
Set<FunnelDTO> funnelDtos = new LinkedHashSet<>();
for (Funnel funnel : funnelDAO.getFunnels(groupId)) {
funnelDtos.add(dtoFactory.createFunnelDto(funnel));
}
return funnelDtos;
}
@Override
public FunnelDTO getFunnel(String groupId, String funnelId) {
return dtoFactory.createFunnelDto(funnelDAO.getFunnel(groupId, funnelId));
}
@Override
public SnippetDTO getSnippet(String snippetId) {
final Snippet snippet = snippetDAO.getSnippet(snippetId);
final SnippetDTO snippetDTO = dtoFactory.createSnippetDto(snippet);
snippetDTO.setContents(snippetUtils.populateFlowSnippet(snippet, false));
return snippetDTO;
}
@Override
public Set<PortDTO> getInputPorts(String groupId) {
Set<PortDTO> portDtos = new LinkedHashSet<>();
for (Port port : inputPortDAO.getPorts(groupId)) {
portDtos.add(dtoFactory.createPortDto(port));
}
return portDtos;
}
@Override
public Set<PortDTO> getOutputPorts(String groupId) {
Set<PortDTO> portDtos = new LinkedHashSet<>();
for (Port port : outputPortDAO.getPorts(groupId)) {
portDtos.add(dtoFactory.createPortDto(port));
}
return portDtos;
}
@Override
public Set<ProcessGroupDTO> getProcessGroups(String parentGroupId) {
Set<ProcessGroupDTO> processGroupDtos = new LinkedHashSet<>();
for (ProcessGroup groups : processGroupDAO.getProcessGroups(parentGroupId)) {
processGroupDtos.add(dtoFactory.createProcessGroupDto(groups));
}
return processGroupDtos;
}
@Override
public Set<RemoteProcessGroupDTO> getRemoteProcessGroups(String groupId) {
Set<RemoteProcessGroupDTO> remoteProcessGroupDtos = new LinkedHashSet<>();
for (RemoteProcessGroup remoteProcessGroup : remoteProcessGroupDAO.getRemoteProcessGroups(groupId)) {
remoteProcessGroupDtos.add(dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));
}
return remoteProcessGroupDtos;
}
@Override
public PortDTO getInputPort(String groupId, String inputPortId) {
return dtoFactory.createPortDto(inputPortDAO.getPort(groupId, inputPortId));
}
@Override
public PortDTO getOutputPort(String groupId, String outputPortId) {
return dtoFactory.createPortDto(outputPortDAO.getPort(groupId, outputPortId));
}
@Override
public RemoteProcessGroupDTO getRemoteProcessGroup(String groupId, String remoteProcessGroupId) {
return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroupDAO.getRemoteProcessGroup(groupId, remoteProcessGroupId));
}
@Override
public StatusHistoryDTO getRemoteProcessGroupStatusHistory(String groupId, String id) {
return controllerFacade.getRemoteProcessGroupStatusHistory(groupId, id);
}
@Override
public ConfigurationSnapshot<ProcessGroupDTO> getProcessGroup(String groupId, final boolean recurse) {
ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
Revision revision = optimisticLockingManager.getLastModification().getRevision();
ConfigurationSnapshot<ProcessGroupDTO> response = new ConfigurationSnapshot<>(revision.getVersion(), dtoFactory.createProcessGroupDto(processGroup, recurse));
return response;
}
@Override
public Set<ControllerServiceDTO> getControllerServices() {
final Set<ControllerServiceDTO> controllerServiceDtos = new LinkedHashSet<>();
for (ControllerServiceNode controllerService : controllerServiceDAO.getControllerServices()) {
controllerServiceDtos.add(dtoFactory.createControllerServiceDto(controllerService));
}
return controllerServiceDtos;
}
@Override
public ControllerServiceDTO getControllerService(String controllerServiceId) {
return dtoFactory.createControllerServiceDto(controllerServiceDAO.getControllerService(controllerServiceId));
}
@Override
public PropertyDescriptorDTO getControllerServicePropertyDescriptor(String id, String property) {
final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);
// return an invalid descriptor if the controller service doesn't support this property
if (descriptor == null) {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor);
}
@Override
public Set<ControllerServiceReferencingComponentDTO> getControllerServiceReferencingComponents(String controllerServiceId) {
final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
return dtoFactory.createControllerServiceReferencingComponentsDto(service.getReferences());
}
@Override
public Set<ReportingTaskDTO> getReportingTasks() {
final Set<ReportingTaskDTO> reportingTaskDtos = new LinkedHashSet<>();
for (ReportingTaskNode reportingTask : reportingTaskDAO.getReportingTasks()) {
reportingTaskDtos.add(dtoFactory.createReportingTaskDto(reportingTask));
}
return reportingTaskDtos;
}
@Override
public ReportingTaskDTO getReportingTask(String reportingTaskId) {
return dtoFactory.createReportingTaskDto(reportingTaskDAO.getReportingTask(reportingTaskId));
}
@Override
public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(String id, String property) {
final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);
// return an invalid descriptor if the reporting task doesn't support this property
if (descriptor == null) {
descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
}
return dtoFactory.createPropertyDescriptorDto(descriptor);
}
@Override
public StatusHistoryDTO getProcessGroupStatusHistory(String groupId) {
return controllerFacade.getProcessGroupStatusHistory(groupId);
}
@Override
public HistoryDTO getActions(HistoryQueryDTO historyQueryDto) {
// extract the query criteria
HistoryQuery historyQuery = new HistoryQuery();
historyQuery.setStartDate(historyQueryDto.getStartDate());
historyQuery.setEndDate(historyQueryDto.getEndDate());
historyQuery.setSourceId(historyQueryDto.getSourceId());
historyQuery.setUserName(historyQueryDto.getUserName());
historyQuery.setOffset(historyQueryDto.getOffset());
historyQuery.setCount(historyQueryDto.getCount());
historyQuery.setSortColumn(historyQueryDto.getSortColumn());
historyQuery.setSortOrder(historyQueryDto.getSortOrder());
// perform the query
History history = auditService.getActions(historyQuery);
// create the response
return dtoFactory.createHistoryDto(history);
}
@Override
public ActionDTO getAction(Integer actionId) {
// get the action
Action action = auditService.getAction(actionId);
// ensure the action was found
if (action == null) {
throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId));
}
// return the action
return dtoFactory.createActionDto(action);
}
@Override
public ComponentHistoryDTO getComponentHistory(String componentId) {
final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);
for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();
for (final PreviousValue previousValue : entry.getValue()) {
final PreviousValueDTO dto = new PreviousValueDTO();
dto.setPreviousValue(previousValue.getPreviousValue());
dto.setTimestamp(previousValue.getTimestamp());
dto.setUserName(previousValue.getUserName());
previousValueDtos.add(dto);
}
if (!previousValueDtos.isEmpty()) {
final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO();
propertyHistoryDto.setPreviousValues(previousValueDtos);
propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto);
}
}
final ComponentHistoryDTO history = new ComponentHistoryDTO();
history.setComponentId(componentId);
history.setPropertyHistory(propertyHistoryDtos);
return history;
}
@Override
public UserDTO getUser(String userId) {
// get the user
NiFiUser user = userService.getUserById(userId);
// ensure the user was found
if (user == null) {
throw new ResourceNotFoundException(String.format("Unable to find user with id '%s'.", userId));
}
return dtoFactory.createUserDTO(user);
}
@Override
public Collection<UserDTO> getUsers(Boolean grouped) {
// get the users
final Collection<NiFiUser> users = userService.getUsers();
final Collection<UserDTO> userDTOs = new HashSet<>();
if (grouped) {
final Map<String, UserDTO> groupedUserDTOs = new HashMap<>();
// group the users
for (final NiFiUser user : users) {
if (StringUtils.isNotBlank(user.getUserGroup())) {
if (groupedUserDTOs.containsKey(user.getUserGroup())) {
final UserDTO groupedUser = groupedUserDTOs.get(user.getUserGroup());
groupedUser.setId(groupedUser.getId() + "," + String.valueOf(user.getId()));
groupedUser.setUserName(groupedUser.getUserName() + ", " + user.getUserName());
groupedUser.setDn(groupedUser.getDn() + ", " + user.getDn());
groupedUser.setCreation(getOldestDate(groupedUser.getCreation(), user.getCreation()));
groupedUser.setLastAccessed(getNewestDate(groupedUser.getLastAccessed(), user.getLastAccessed()));
groupedUser.setLastVerified(getNewestDate(groupedUser.getLastVerified(), user.getLastVerified()));
// only retain the justification if al users have the same justification
if (groupedUser.getJustification() != null) {
if (!groupedUser.getStatus().equals(user.getJustification())) {
groupedUser.setJustification(null);
}
}
// only retain the status if all users have the same status
if (groupedUser.getStatus() != null) {
if (!groupedUser.getStatus().equals(user.getStatus().toString())) {
groupedUser.setStatus(null);
}
}
// only retain the authorities if all users have the same authorities
if (groupedUser.getAuthorities() != null) {
final Set<String> groupAuthorities = new HashSet<>(groupedUser.getAuthorities());
final Set<String> userAuthorities = Authority.convertAuthorities(user.getAuthorities());
if (!CollectionUtils.isEqualCollection(groupAuthorities, userAuthorities)) {
groupedUser.setAuthorities(null);
}
}
} else {
groupedUserDTOs.put(user.getUserGroup(), dtoFactory.createUserDTO(user));
}
} else {
userDTOs.add(dtoFactory.createUserDTO(user));
}
}
// add the grouped users
userDTOs.addAll(groupedUserDTOs.values());
} else {
// convert each into a DTOs
for (final NiFiUser user : users) {
userDTOs.add(dtoFactory.createUserDTO(user));
}
}
return userDTOs;
}
@Override
public boolean isClustered() {
return controllerFacade.isClustered();
}
@Override
public String getNodeId() {
final NodeIdentifier nodeId = controllerFacade.getNodeId();
if (nodeId != null) {
return nodeId.getId();
} else {
return null;
}
}
@Override
public ClusterDTO getCluster() {
// create cluster summary dto
final ClusterDTO clusterDto = new ClusterDTO();
// set current time
clusterDto.setGenerated(new Date());
// create node dtos
final Collection<NodeDTO> nodeDtos = new ArrayList<>();
clusterDto.setNodes(nodeDtos);
for (final Node node : clusterManager.getNodes()) {
// create and add node dto
final String nodeId = node.getNodeId().getId();
nodeDtos.add(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
}
return clusterDto;
}
@Override
public NodeDTO getNode(String nodeId) {
final Node node = clusterManager.getNode(nodeId);
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else {
return dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId));
}
}
@Override
public void deleteNode(String nodeId) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
final String userDn = user.getDn();
clusterManager.deleteNode(nodeId, userDn);
}
private ProcessorStatus findNodeProcessorStatus(final ProcessGroupStatus groupStatus, final String processorId) {
ProcessorStatus processorStatus = null;
for (final ProcessorStatus status : groupStatus.getProcessorStatus()) {
if (processorId.equals(status.getId())) {
processorStatus = status;
break;
}
}
if (processorStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
processorStatus = findNodeProcessorStatus(status, processorId);
if (processorStatus != null) {
break;
}
}
}
return processorStatus;
}
// TODO Refactor!!!
@Override
public ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId) {
final ClusterProcessorStatusDTO clusterProcessorStatusDto = new ClusterProcessorStatusDTO();
clusterProcessorStatusDto.setNodeProcessorStatus(new ArrayList<NodeProcessorStatusDTO>());
// set the current time
clusterProcessorStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// attempt to find the processor stats for this node
final ProcessorStatus processorStatus = findNodeProcessorStatus(nodeStats, processorId);
// sanity check that we have status for this processor
if (processorStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for processor id '%s'.", processorId));
}
if (firstNode) {
clusterProcessorStatusDto.setProcessorId(processorId);
clusterProcessorStatusDto.setProcessorName(processorStatus.getName());
clusterProcessorStatusDto.setProcessorType(processorStatus.getType());
clusterProcessorStatusDto.setProcessorRunStatus(processorStatus.getRunStatus().toString());
firstNode = false;
}
// create node processor status dto
final NodeProcessorStatusDTO nodeProcessorStatusDTO = new NodeProcessorStatusDTO();
clusterProcessorStatusDto.getNodeProcessorStatus().add(nodeProcessorStatusDTO);
// populate node processor status dto
final String nodeId = node.getNodeId().getId();
nodeProcessorStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeProcessorStatusDTO.setProcessorStatus(dtoFactory.createProcessorStatusDto(processorStatus));
}
return clusterProcessorStatusDto;
}
private ConnectionStatus findNodeConnectionStatus(final ProcessGroupStatus groupStatus, final String connectionId) {
ConnectionStatus connectionStatus = null;
for (final ConnectionStatus status : groupStatus.getConnectionStatus()) {
if (connectionId.equals(status.getId())) {
connectionStatus = status;
break;
}
}
if (connectionStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
connectionStatus = findNodeConnectionStatus(status, connectionId);
if (connectionStatus != null) {
break;
}
}
}
return connectionStatus;
}
@Override
public ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId) {
final ClusterConnectionStatusDTO clusterConnectionStatusDto = new ClusterConnectionStatusDTO();
clusterConnectionStatusDto.setNodeConnectionStatus(new ArrayList<NodeConnectionStatusDTO>());
// set the current time
clusterConnectionStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// find the connection status for this node
final ConnectionStatus connectionStatus = findNodeConnectionStatus(nodeStats, connectionId);
// sanity check that we have status for this connection
if (connectionStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for connection id '%s'.", connectionId));
}
if (firstNode) {
clusterConnectionStatusDto.setConnectionId(connectionId);
clusterConnectionStatusDto.setConnectionName(connectionStatus.getName());
firstNode = false;
}
// create node connection status dto
final NodeConnectionStatusDTO nodeConnectionStatusDTO = new NodeConnectionStatusDTO();
clusterConnectionStatusDto.getNodeConnectionStatus().add(nodeConnectionStatusDTO);
// populate node processor status dto
final String nodeId = node.getNodeId().getId();
nodeConnectionStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeConnectionStatusDTO.setConnectionStatus(dtoFactory.createConnectionStatusDto(connectionStatus));
}
return clusterConnectionStatusDto;
}
private ProcessGroupStatus findNodeProcessGroupStatus(final ProcessGroupStatus groupStatus, final String processGroupId) {
ProcessGroupStatus processGroupStatus = null;
if (processGroupId.equals(groupStatus.getId())) {
processGroupStatus = groupStatus;
}
if (processGroupStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
processGroupStatus = findNodeProcessGroupStatus(status, processGroupId);
if (processGroupStatus != null) {
break;
}
}
}
return processGroupStatus;
}
@Override
public ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processGroupId) {
final ClusterProcessGroupStatusDTO clusterProcessGroupStatusDto = new ClusterProcessGroupStatusDTO();
clusterProcessGroupStatusDto.setNodeProcessGroupStatus(new ArrayList<NodeProcessGroupStatusDTO>());
// set the current time
clusterProcessGroupStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// attempt to find the process group stats for this node
final ProcessGroupStatus processGroupStatus = findNodeProcessGroupStatus(nodeStats, processGroupId);
// sanity check that we have status for this process group
if (processGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for process group id '%s'.", processGroupId));
}
if (firstNode) {
clusterProcessGroupStatusDto.setProcessGroupId(processGroupId);
clusterProcessGroupStatusDto.setProcessGroupName(processGroupStatus.getName());
firstNode = false;
}
// create node process group status dto
final NodeProcessGroupStatusDTO nodeProcessGroupStatusDTO = new NodeProcessGroupStatusDTO();
clusterProcessGroupStatusDto.getNodeProcessGroupStatus().add(nodeProcessGroupStatusDTO);
// populate node process group status dto
final String nodeId = node.getNodeId().getId();
nodeProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeProcessGroupStatusDTO.setProcessGroupStatus(dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), processGroupStatus));
}
return clusterProcessGroupStatusDto;
}
private PortStatus findNodeInputPortStatus(final ProcessGroupStatus groupStatus, final String inputPortId) {
PortStatus portStatus = null;
for (final PortStatus status : groupStatus.getInputPortStatus()) {
if (inputPortId.equals(status.getId())) {
portStatus = status;
break;
}
}
if (portStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
portStatus = findNodeInputPortStatus(status, inputPortId);
if (portStatus != null) {
break;
}
}
}
return portStatus;
}
@Override
public ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId) {
final ClusterPortStatusDTO clusterInputPortStatusDto = new ClusterPortStatusDTO();
clusterInputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>());
// set the current time
clusterInputPortStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// find the input status for this node
final PortStatus inputPortStatus = findNodeInputPortStatus(nodeStats, inputPortId);
// sanity check that we have status for this input port
if (inputPortStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for input port id '%s'.", inputPortId));
}
if (firstNode) {
clusterInputPortStatusDto.setPortId(inputPortId);
clusterInputPortStatusDto.setPortName(inputPortStatus.getName());
firstNode = false;
}
// create node port status dto
final NodePortStatusDTO nodeInputPortStatusDTO = new NodePortStatusDTO();
clusterInputPortStatusDto.getNodePortStatus().add(nodeInputPortStatusDTO);
// populate node input port status dto
final String nodeId = node.getNodeId().getId();
nodeInputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeInputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(inputPortStatus));
}
return clusterInputPortStatusDto;
}
private PortStatus findNodeOutputPortStatus(final ProcessGroupStatus groupStatus, final String outputPortId) {
PortStatus portStatus = null;
for (final PortStatus status : groupStatus.getOutputPortStatus()) {
if (outputPortId.equals(status.getId())) {
portStatus = status;
break;
}
}
if (portStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
portStatus = findNodeOutputPortStatus(status, outputPortId);
if (portStatus != null) {
break;
}
}
}
return portStatus;
}
@Override
public ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId) {
final ClusterPortStatusDTO clusterOutputPortStatusDto = new ClusterPortStatusDTO();
clusterOutputPortStatusDto.setNodePortStatus(new ArrayList<NodePortStatusDTO>());
// set the current time
clusterOutputPortStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// find the output status for this node
final PortStatus outputPortStatus = findNodeOutputPortStatus(nodeStats, outputPortId);
// sanity check that we have status for this output port
if (outputPortStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for output port id '%s'.", outputPortId));
}
if (firstNode) {
clusterOutputPortStatusDto.setPortId(outputPortId);
clusterOutputPortStatusDto.setPortName(outputPortStatus.getName());
firstNode = false;
}
// create node port status dto
final NodePortStatusDTO nodeOutputPortStatusDTO = new NodePortStatusDTO();
clusterOutputPortStatusDto.getNodePortStatus().add(nodeOutputPortStatusDTO);
// populate node output port status dto
final String nodeId = node.getNodeId().getId();
nodeOutputPortStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeOutputPortStatusDTO.setPortStatus(dtoFactory.createPortStatusDto(outputPortStatus));
}
return clusterOutputPortStatusDto;
}
private RemoteProcessGroupStatus findNodeRemoteProcessGroupStatus(final ProcessGroupStatus groupStatus, final String remoteProcessGroupId) {
RemoteProcessGroupStatus remoteProcessGroupStatus = null;
for (final RemoteProcessGroupStatus status : groupStatus.getRemoteProcessGroupStatus()) {
if (remoteProcessGroupId.equals(status.getId())) {
remoteProcessGroupStatus = status;
break;
}
}
if (remoteProcessGroupStatus == null) {
for (final ProcessGroupStatus status : groupStatus.getProcessGroupStatus()) {
remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(status, remoteProcessGroupId);
if (remoteProcessGroupStatus != null) {
break;
}
}
}
return remoteProcessGroupStatus;
}
@Override
public ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId) {
final ClusterRemoteProcessGroupStatusDTO clusterRemoteProcessGroupStatusDto = new ClusterRemoteProcessGroupStatusDTO();
clusterRemoteProcessGroupStatusDto.setNodeRemoteProcessGroupStatus(new ArrayList<NodeRemoteProcessGroupStatusDTO>());
// set the current time
clusterRemoteProcessGroupStatusDto.setStatsLastRefreshed(new Date());
final Set<Node> nodes = clusterManager.getNodes(Node.Status.CONNECTED);
boolean firstNode = true;
for (final Node node : nodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeStats = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeStats == null || nodeStats.getProcessorStatus() == null) {
continue;
}
// find the remote process group for this node
final RemoteProcessGroupStatus remoteProcessGroupStatus = findNodeRemoteProcessGroupStatus(nodeStats, remoteProcessGroupId);
// sanity check that we have status for this remote process group
if (remoteProcessGroupStatus == null) {
throw new ResourceNotFoundException(String.format("Unable to find status for remote process group id '%s'.", remoteProcessGroupId));
}
if (firstNode) {
clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupId(remoteProcessGroupId);
clusterRemoteProcessGroupStatusDto.setRemoteProcessGroupName(remoteProcessGroupStatus.getName());
firstNode = false;
}
// create node remote process group status dto
final NodeRemoteProcessGroupStatusDTO nodeRemoteProcessGroupStatusDTO = new NodeRemoteProcessGroupStatusDTO();
clusterRemoteProcessGroupStatusDto.getNodeRemoteProcessGroupStatus().add(nodeRemoteProcessGroupStatusDTO);
// populate node remote process group status dto
final String nodeId = node.getNodeId().getId();
nodeRemoteProcessGroupStatusDTO.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
nodeRemoteProcessGroupStatusDTO.setRemoteProcessGroupStatus(dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupStatus));
}
return clusterRemoteProcessGroupStatusDto;
}
@Override
public ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId) {
return clusterManager.getProcessorStatusHistory(processorId);
}
@Override
public ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId) {
return clusterManager.getConnectionStatusHistory(connectionId);
}
@Override
public ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId) {
return clusterManager.getProcessGroupStatusHistory(processGroupId);
}
@Override
public ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId) {
return clusterManager.getRemoteProcessGroupStatusHistory(remoteProcessGroupId);
}
@Override
public NodeStatusDTO getNodeStatus(String nodeId) {
// find the node in question
final Node node = clusterManager.getNode(nodeId);
// verify node state
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else if (Node.Status.CONNECTED != node.getStatus()) {
throw new IllegalClusterStateException(
String.format("Node '%s:%s' is not connected to the cluster.",
node.getNodeId().getApiAddress(), node.getNodeId().getApiPort()));
}
// get the node's last heartbeat
final NodeStatusDTO nodeStatus = new NodeStatusDTO();
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
return nodeStatus;
}
// get the node status
final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeProcessGroupStatus == null) {
return nodeStatus;
}
final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus);
nodeStatus.setControllerStatus(nodeProcessGroupStatusDto);
nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
return nodeStatus;
}
@Override
public NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId) {
// find the node in question
final Node node = clusterManager.getNode(nodeId);
// verify node state
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else if (Node.Status.CONNECTED != node.getStatus()) {
throw new IllegalClusterStateException(
String.format("Node '%s:%s' is not connected to the cluster.",
node.getNodeId().getApiAddress(), node.getNodeId().getApiPort()));
}
// get the node's last heartbeat
final NodeSystemDiagnosticsDTO nodeStatus = new NodeSystemDiagnosticsDTO();
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
return nodeStatus;
}
// get the node status
final SystemDiagnostics nodeSystemDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics();
if (nodeSystemDiagnostics == null) {
return nodeStatus;
}
// populate the dto
nodeStatus.setControllerStatus(dtoFactory.createSystemDiagnosticsDto(nodeSystemDiagnostics));
nodeStatus.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
return nodeStatus;
}
@Override
public ClusterStatusDTO getClusterStatus() {
// create cluster status dto
final ClusterStatusDTO clusterStatusDto = new ClusterStatusDTO();
// populate node status dtos
final Collection<NodeStatusDTO> nodeStatusDtos = new ArrayList<>();
clusterStatusDto.setNodeStatus(nodeStatusDtos);
for (final Node node : clusterManager.getNodes()) {
if (Node.Status.CONNECTED != node.getStatus()) {
continue;
}
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
if (nodeProcessGroupStatus == null) {
continue;
}
final ProcessGroupStatusDTO nodeProcessGroupStatusDto = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), nodeProcessGroupStatus);
// create node status dto
final NodeStatusDTO nodeStatusDto = new NodeStatusDTO();
nodeStatusDtos.add(nodeStatusDto);
// populate the status
nodeStatusDto.setControllerStatus(nodeProcessGroupStatusDto);
// create and add node dto
final String nodeId = node.getNodeId().getId();
nodeStatusDto.setNode(dtoFactory.createNodeDTO(node, clusterManager.getNodeEvents(nodeId), isPrimaryNode(nodeId)));
}
return clusterStatusDto;
}
@Override
public ProcessorDTO getProcessor(String id) {
ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
String groupId = controllerFacade.findProcessGroupIdForProcessor(id);
// ensure the parent group id was found
if (groupId == null) {
throw new ResourceNotFoundException(String.format("Unable to locate Processor with id '%s'.", id));
}
// get the processor
return getProcessor(groupId, id);
} finally {
if (currentContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(currentContextClassLoader);
}
}
}
/* setters */
public void setProperties(NiFiProperties properties) {
this.properties = properties;
}
public void setControllerFacade(ControllerFacade controllerFacade) {
this.controllerFacade = controllerFacade;
}
public void setRemoteProcessGroupDAO(RemoteProcessGroupDAO remoteProcessGroupDAO) {
this.remoteProcessGroupDAO = remoteProcessGroupDAO;
}
public void setLabelDAO(LabelDAO labelDAO) {
this.labelDAO = labelDAO;
}
public void setFunnelDAO(FunnelDAO funnelDAO) {
this.funnelDAO = funnelDAO;
}
public void setSnippetDAO(SnippetDAO snippetDAO) {
this.snippetDAO = snippetDAO;
}
public void setProcessorDAO(ProcessorDAO processorDAO) {
this.processorDAO = processorDAO;
}
public void setConnectionDAO(ConnectionDAO connectionDAO) {
this.connectionDAO = connectionDAO;
}
public void setAuditService(AuditService auditService) {
this.auditService = auditService;
}
public void setUserService(UserService userService) {
this.userService = userService;
}
public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
this.optimisticLockingManager = optimisticLockingManager;
}
public void setClusterManager(WebClusterManager clusterManager) {
this.clusterManager = clusterManager;
}
public void setDtoFactory(DtoFactory dtoFactory) {
this.dtoFactory = dtoFactory;
}
public void setInputPortDAO(PortDAO inputPortDAO) {
this.inputPortDAO = inputPortDAO;
}
public void setOutputPortDAO(PortDAO outputPortDAO) {
this.outputPortDAO = outputPortDAO;
}
public void setProcessGroupDAO(ProcessGroupDAO processGroupDAO) {
this.processGroupDAO = processGroupDAO;
}
public void setControllerServiceDAO(ControllerServiceDAO controllerServiceDAO) {
this.controllerServiceDAO = controllerServiceDAO;
}
public void setReportingTaskDAO(ReportingTaskDAO reportingTaskDAO) {
this.reportingTaskDAO = reportingTaskDAO;
}
public void setTemplateDAO(TemplateDAO templateDAO) {
this.templateDAO = templateDAO;
}
public void setSnippetUtils(SnippetUtils snippetUtils) {
this.snippetUtils = snippetUtils;
}
private boolean isPrimaryNode(String nodeId) {
final Node primaryNode = clusterManager.getPrimaryNode();
return (primaryNode != null && primaryNode.getNodeId().getId().equals(nodeId));
}
/**
* Utility method to get the oldest of the two specified dates.
*
* @param date1
* @param date2
* @return
*/
private Date getOldestDate(final Date date1, final Date date2) {
if (date1 == null && date2 == null) {
return null;
} else if (date1 == null) {
return date2;
} else if (date2 == null) {
return date1;
}
if (date1.before(date2)) {
return date1;
} else if (date1.after(date2)) {
return date2;
} else {
return date1;
}
}
/**
* Utility method to get the newest of the two specified dates.
*
* @param date1
* @param date2
* @return
*/
private Date getNewestDate(final Date date1, final Date date2) {
if (date1 == null && date2 == null) {
return null;
} else if (date1 == null) {
return date2;
} else if (date2 == null) {
return date1;
}
if (date1.before(date2)) {
return date2;
} else if (date1.after(date2)) {
return date1;
} else {
return date1;
}
}
/**
* Utility method for extracting component counts from the specified group
* status.
*
* @param groupStatus
* @return
*/
private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) {
int running = 0;
int stopped = 0;
int invalid = 0;
int disabled = 0;
int activeRemotePorts = 0;
int inactiveRemotePorts = 0;
for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) {
switch (processorStatus.getRunStatus()) {
case Disabled:
disabled++;
break;
case Running:
running++;
break;
case Invalid:
invalid++;
break;
default:
stopped++;
break;
}
}
for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
switch (portStatus.getRunStatus()) {
case Disabled:
disabled++;
break;
case Running:
running++;
break;
case Invalid:
invalid++;
break;
default:
stopped++;
break;
}
}
for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
switch (portStatus.getRunStatus()) {
case Disabled:
disabled++;
break;
case Running:
running++;
break;
case Invalid:
invalid++;
break;
default:
stopped++;
break;
}
}
for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) {
if (remoteStatus.getActiveRemotePortCount() != null) {
activeRemotePorts += remoteStatus.getActiveRemotePortCount();
}
if (remoteStatus.getInactiveRemotePortCount() != null) {
inactiveRemotePorts += remoteStatus.getInactiveRemotePortCount();
}
if (CollectionUtils.isNotEmpty(remoteStatus.getAuthorizationIssues())) {
invalid++;
}
}
for (final ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
final ProcessGroupCounts childCounts = extractProcessGroupCounts(childGroupStatus);
running += childCounts.getRunningCount();
stopped += childCounts.getStoppedCount();
invalid += childCounts.getInvalidCount();
disabled += childCounts.getDisabledCount();
activeRemotePorts += childCounts.getActiveRemotePortCount();
inactiveRemotePorts += childCounts.getInactiveRemotePortCount();
}
return new ProcessGroupCounts(0, 0, running, stopped, invalid, disabled, activeRemotePorts, inactiveRemotePorts);
}
}