/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.web;

import com.google.common.collect.Sets;
import io.prometheus.client.CollectorRegistry;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.FlowChangePurgeDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AccessPolicy;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.Group;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.User;
import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource;
import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextLookup;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.authorization.Permissions;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConfigurableComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowComparator;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BucketDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.ComponentDifferenceDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ComponentReferenceDTO;
import org.apache.nifi.web.api.dto.ComponentRestrictionPermissionDTO;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
import org.apache.nifi.web.api.dto.ComponentValidationResultDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.FlowConfigurationDTO;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ParameterDTO;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
import org.apache.nifi.web.api.dto.RegistryDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.RequiredPermissionDTO;
import org.apache.nifi.web.api.dto.ResourceDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.UserDTO;
import org.apache.nifi.web.api.dto.UserGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.BucketEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.LabelEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
import org.apache.nifi.web.api.entity.RegistryEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import org.apache.nifi.web.dao.FunnelDAO;
import org.apache.nifi.web.dao.LabelDAO;
import org.apache.nifi.web.dao.ParameterContextDAO;
import org.apache.nifi.web.dao.PortDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.ProcessorDAO;
import org.apache.nifi.web.dao.RegistryDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.apache.nifi.web.dao.SnippetDAO;
import org.apache.nifi.web.dao.TemplateDAO;
import org.apache.nifi.web.dao.UserDAO;
import org.apache.nifi.web.dao.UserGroupDAO;
import org.apache.nifi.web.revision.DeleteRevisionTask;
import org.apache.nifi.web.revision.ExpiredRevisionClaimException;
import org.apache.nifi.web.revision.RevisionClaim;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.nifi.web.revision.RevisionUpdate;
import org.apache.nifi.web.revision.StandardRevisionClaim;
import org.apache.nifi.web.revision.StandardRevisionUpdate;
import org.apache.nifi.web.revision.UpdateRevisionTask;
import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Implementation of NiFiServiceFacade that performs revision checking.
 */
public class StandardNiFiServiceFacade implements NiFiServiceFacade {
    private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class);
    private static final int VALIDATION_WAIT_MILLIS = 50;

    // nifi core components
    private ControllerFacade controllerFacade;
    private SnippetUtils snippetUtils;

    // revision manager
    private RevisionManager revisionManager;
    private BulletinRepository bulletinRepository;

    // data access objects
    private ProcessorDAO processorDAO;
    private ProcessGroupDAO processGroupDAO;
    private RemoteProcessGroupDAO remoteProcessGroupDAO;
    private LabelDAO labelDAO;
    private FunnelDAO funnelDAO;
    private SnippetDAO snippetDAO;
    private PortDAO inputPortDAO;
    private PortDAO outputPortDAO;
    private ConnectionDAO connectionDAO;
    private ControllerServiceDAO controllerServiceDAO;
    private ReportingTaskDAO reportingTaskDAO;
    private TemplateDAO templateDAO;
    private UserDAO userDAO;
    private UserGroupDAO userGroupDAO;
    private AccessPolicyDAO accessPolicyDAO;
    private RegistryDAO registryDAO;
    private ParameterContextDAO parameterContextDAO;
    private ClusterCoordinator clusterCoordinator;
    private HeartbeatMonitor heartbeatMonitor;
    private LeaderElectionManager leaderElectionManager;

    // administrative services
    private AuditService auditService;

    // flow registry
    private FlowRegistryClient flowRegistryClient;

    // properties
    private NiFiProperties properties;
    private DtoFactory dtoFactory;
    private EntityFactory entityFactory;

    private Authorizer authorizer;

    private AuthorizableLookup authorizableLookup;

    // Prometheus Metrics objects
    private NiFiMetricsRegistry nifiMetricsRegistry = new NiFiMetricsRegistry();
    private JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
    private ConnectionAnalyticsMetricsRegistry connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
    private BulletinMetricsRegistry bulletinMetricsRegistry = new BulletinMetricsRegistry();

    public final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(
            nifiMetricsRegistry.getRegistry(),
            jvmMetricsRegistry.getRegistry(),
            connectionAnalyticsMetricsRegistry.getRegistry(),
            bulletinMetricsRegistry.getRegistry()
    );


    // -----------------------------------------
    // Synchronization methods
    // -----------------------------------------
    @Override
    public void authorizeAccess(final AuthorizeAccess authorizeAccess) {
        authorizeAccess.authorize(authorizableLookup);
    }

    @Override
    public void verifyRevision(final Revision revision, final NiFiUser user) {
        final Revision curRevision = revisionManager.getRevision(revision.getComponentId());
        if (revision.equals(curRevision)) {
            return;
        }

        throw new InvalidRevisionException(revision + " is not the most up-to-date revision. This component appears to have been modified");
    }

    @Override
    public void verifyRevisions(final Set<Revision> revisions, final NiFiUser user) {
        for (final Revision revision : revisions) {
            verifyRevision(revision, user);
        }
    }

    @Override
    public Set<Revision> getRevisionsFromGroup(final String groupId, final Function<ProcessGroup, Set<String>> getComponents) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        final Set<String> componentIds = getComponents.apply(group);
        return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
    }

    @Override
    public Set<Revision> getRevisionsFromSnippet(final String snippetId) {
        final Snippet snippet = snippetDAO.getSnippet(snippetId);
        final Set<String> componentIds = new HashSet<>();
        componentIds.addAll(snippet.getProcessors().keySet());
        componentIds.addAll(snippet.getFunnels().keySet());
        componentIds.addAll(snippet.getLabels().keySet());
        componentIds.addAll(snippet.getConnections().keySet());
        componentIds.addAll(snippet.getInputPorts().keySet());
        componentIds.addAll(snippet.getOutputPorts().keySet());
        componentIds.addAll(snippet.getProcessGroups().keySet());
        componentIds.addAll(snippet.getRemoteProcessGroups().keySet());
        return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet());
    }

    // -----------------------------------------
    // Verification Operations
    // -----------------------------------------

    @Override
    public void verifyListQueue(final String connectionId) {
        connectionDAO.verifyList(connectionId);
    }

    @Override
    public void verifyCreateConnection(final String groupId, final ConnectionDTO connectionDTO) {
        connectionDAO.verifyCreate(groupId, connectionDTO);
    }

    @Override
    public void verifyUpdateConnection(final ConnectionDTO connectionDTO) {
        // if connection does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (connectionDAO.hasConnection(connectionDTO.getId())) {
            connectionDAO.verifyUpdate(connectionDTO);
        } else {
            connectionDAO.verifyCreate(connectionDTO.getParentGroupId(), connectionDTO);
        }
    }

    @Override
    public void verifyDeleteConnection(final String connectionId) {
        connectionDAO.verifyDelete(connectionId);
    }

    @Override
    public void verifyDeleteFunnel(final String funnelId) {
        funnelDAO.verifyDelete(funnelId);
    }

    @Override
    public void verifyUpdateInputPort(final PortDTO inputPortDTO) {
        // if connection does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (inputPortDAO.hasPort(inputPortDTO.getId())) {
            inputPortDAO.verifyUpdate(inputPortDTO);
        }
    }

    @Override
    public void verifyDeleteInputPort(final String inputPortId) {
        inputPortDAO.verifyDelete(inputPortId);
    }

    @Override
    public void verifyUpdateOutputPort(final PortDTO outputPortDTO) {
        // if connection does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (outputPortDAO.hasPort(outputPortDTO.getId())) {
            outputPortDAO.verifyUpdate(outputPortDTO);
        }
    }

    @Override
    public void verifyDeleteOutputPort(final String outputPortId) {
        outputPortDAO.verifyDelete(outputPortId);
    }

    @Override
    public void verifyCreateProcessor(ProcessorDTO processorDTO) {
        processorDAO.verifyCreate(processorDTO);
    }

    @Override
    public void verifyUpdateProcessor(final ProcessorDTO processorDTO) {
        // if group does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (processorDAO.hasProcessor(processorDTO.getId())) {
            processorDAO.verifyUpdate(processorDTO);
        } else {
            verifyCreateProcessor(processorDTO);
        }
    }

    @Override
    public void verifyCanVerifyProcessorConfig(final String processorId) {
        processorDAO.verifyConfigVerification(processorId);
    }

    @Override
    public void verifyCanVerifyControllerServiceConfig(final String controllerServiceId) {
        controllerServiceDAO.verifyConfigVerification(controllerServiceId);
    }

    @Override
    public void verifyCanVerifyReportingTaskConfig(final String reportingTaskId) {
        reportingTaskDAO.verifyConfigVerification(reportingTaskId);
    }

    @Override
    public void verifyDeleteProcessor(final String processorId) {
        processorDAO.verifyDelete(processorId);
    }

    @Override
    public void verifyScheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) {
        processGroupDAO.verifyScheduleComponents(groupId, state, componentIds);
    }

    @Override
    public void verifyEnableComponents(String processGroupId, ScheduledState state, Set<String> componentIds) {
        processGroupDAO.verifyEnableComponents(processGroupId, state, componentIds);
    }

    @Override
    public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
        processGroupDAO.verifyActivateControllerServices(state, serviceIds);
    }

    @Override
    public void verifyDeleteProcessGroup(final String groupId) {
        processGroupDAO.verifyDelete(groupId);
    }

    @Override
    public void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit) {
        List<RemoteProcessGroup> allRemoteProcessGroups = processGroupDAO.getProcessGroup(processGroupId).findAllRemoteProcessGroups();

        allRemoteProcessGroups.stream()
            .map(remoteProcessGroup -> {
                final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
                dto.setId(remoteProcessGroup.getIdentifier());
                dto.setTransmitting(shouldTransmit);
                return dto;
            })
            .forEach(this::verifyUpdateRemoteProcessGroup);
    }

    @Override
    public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
        // if remote group does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupDTO.getId())) {
            remoteProcessGroupDAO.verifyUpdate(remoteProcessGroupDTO);
        }
    }

    @Override
    public void verifyUpdateRemoteProcessGroupInputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
        remoteProcessGroupDAO.verifyUpdateInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
    }

    @Override
    public void verifyUpdateRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
        remoteProcessGroupDAO.verifyUpdateOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO);
    }

    @Override
    public void verifyDeleteRemoteProcessGroup(final String remoteProcessGroupId) {
        remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
    }

    @Override
    public void verifyCreateControllerService(ControllerServiceDTO controllerServiceDTO) {
        controllerServiceDAO.verifyCreate(controllerServiceDTO);
    }

    @Override
    public void verifyUpdateControllerService(final ControllerServiceDTO controllerServiceDTO) {
        // if service does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId())) {
            controllerServiceDAO.verifyUpdate(controllerServiceDTO);
        } else {
            verifyCreateControllerService(controllerServiceDTO);
        }
    }

    @Override
    public void verifyUpdateControllerServiceReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
        controllerServiceDAO.verifyUpdateReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
    }

    @Override
    public void verifyDeleteControllerService(final String controllerServiceId) {
        controllerServiceDAO.verifyDelete(controllerServiceId);
    }

    @Override
    public void verifyCreateReportingTask(ReportingTaskDTO reportingTaskDTO) {
        reportingTaskDAO.verifyCreate(reportingTaskDTO);
    }

    @Override
    public void verifyUpdateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
        // if tasks does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId())) {
            reportingTaskDAO.verifyUpdate(reportingTaskDTO);
        } else {
            verifyCreateReportingTask(reportingTaskDTO);
        }
    }

    @Override
    public void verifyDeleteReportingTask(final String reportingTaskId) {
        reportingTaskDAO.verifyDelete(reportingTaskId);
    }

    // -----------------------------------------
    // Write Operations
    // -----------------------------------------

    @Override
    public AccessPolicyEntity updateAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
        final Authorizable authorizable = authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId());
        final RevisionUpdate<AccessPolicyDTO> snapshot = updateComponent(revision,
                authorizable,
                () -> accessPolicyDAO.updateAccessPolicy(accessPolicyDTO),
                accessPolicy -> {
                    final Set<TenantEntity> users = accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
                    final Set<TenantEntity> userGroups = accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
                    final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
                    return dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference);
                });

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizable);
        return entityFactory.createAccessPolicyEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    @Override
    public UserEntity updateUser(final Revision revision, final UserDTO userDTO) {
        final Authorizable usersAuthorizable = authorizableLookup.getTenant();
        final Set<Group> groups = userGroupDAO.getUserGroupsForUser(userDTO.getId());
        final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUser(userDTO.getId());
        final RevisionUpdate<UserDTO> snapshot = updateComponent(revision,
                usersAuthorizable,
                () -> userDAO.updateUser(userDTO),
                user -> {
                    final Set<TenantEntity> tenantEntities = groups.stream().map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
                    final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
                    return dtoFactory.createUserDto(user, tenantEntities, policyEntities);
                });

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(usersAuthorizable);
        return entityFactory.createUserEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    @Override
    public UserGroupEntity updateUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
        final Authorizable userGroupsAuthorizable = authorizableLookup.getTenant();
        final Set<AccessPolicy> policies = userGroupDAO.getAccessPoliciesForUserGroup(userGroupDTO.getId());
        final RevisionUpdate<UserGroupDTO> snapshot = updateComponent(revision,
                userGroupsAuthorizable,
                () -> userGroupDAO.updateUserGroup(userGroupDTO),
                userGroup -> {
                    final Set<TenantEntity> tenantEntities = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
                    final Set<AccessPolicySummaryEntity> policyEntities = policies.stream().map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
                    return dtoFactory.createUserGroupDto(userGroup, tenantEntities, policyEntities);
                }
        );

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(userGroupsAuthorizable);
        return entityFactory.createUserGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    @Override
    public ConnectionEntity updateConnection(final Revision revision, final ConnectionDTO connectionDTO) {
        final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId());

        final RevisionUpdate<ConnectionDTO> snapshot = updateComponent(
                revision,
                connectionNode,
                () -> connectionDAO.updateConnection(connectionDTO),
                connection -> dtoFactory.createConnectionDto(connection));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connectionNode);
        final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionNode.getIdentifier()));
        return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
    }

    @Override
    public ProcessorEntity updateProcessor(final Revision revision, final ProcessorDTO processorDTO) {
        // get the component, ensure we have access to it, and perform the update request
        final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
        final RevisionUpdate<ProcessorDTO> snapshot = updateComponent(revision,
                processorNode,
                () -> processorDAO.updateProcessor(processorDTO),
                proc -> {
                    awaitValidationCompletion(proc);
                    return dtoFactory.createProcessorDto(proc);
                });

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processorNode);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processorNode));
        final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorNode.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorNode.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public List<ConfigVerificationResultDTO> verifyProcessorConfiguration(final String processorId, final ProcessorConfigDTO processorConfig, final Map<String, String> attributes) {
        return processorDAO.verifyProcessorConfiguration(processorId, processorConfig, attributes);
    }

    private void awaitValidationCompletion(final ComponentNode component) {
        component.getValidationStatus(VALIDATION_WAIT_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Override
    public LabelEntity updateLabel(final Revision revision, final LabelDTO labelDTO) {
        final Label labelNode = labelDAO.getLabel(labelDTO.getId());
        final RevisionUpdate<LabelDTO> snapshot = updateComponent(revision,
                labelNode,
                () -> labelDAO.updateLabel(labelDTO),
                label -> dtoFactory.createLabelDto(label));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(labelNode);
        return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    @Override
    public FunnelEntity updateFunnel(final Revision revision, final FunnelDTO funnelDTO) {
        final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId());
        final RevisionUpdate<FunnelDTO> snapshot = updateComponent(revision,
                funnelNode,
                () -> funnelDAO.updateFunnel(funnelDTO),
                funnel -> dtoFactory.createFunnelDto(funnel));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnelNode);
        return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }


    /**
     * Updates a component with the given revision, using the provided supplier to call
     * into the appropriate DAO and the provided function to convert the component into a DTO.
     *
     * @param revision    the current revision
     * @param daoUpdate   a Supplier that will update the component via the appropriate DAO
     * @param dtoCreation a Function to convert a component into a dao
     * @param <D>         the DTO Type of the updated component
     * @param <C>         the Component Type of the updated component
     * @return A RevisionUpdate that represents the new configuration
     */
    private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
        try {
            final NiFiUser user = NiFiUserUtils.getNiFiUser();

            final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
                @Override
                public RevisionUpdate<D> update() {
                    // get the updated component
                    final C component = daoUpdate.get();

                    // save updated controller
                    controllerFacade.save();

                    final D dto = dtoCreation.apply(component);

                    final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
                    final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
                    return new StandardRevisionUpdate<>(dto, lastModification);
                }
            });

            return updatedComponent;
        } catch (final ExpiredRevisionClaimException erce) {
            throw new InvalidRevisionException("Failed to update component " + authorizable, erce);
        }
    }


    @Override
    public void verifyUpdateSnippet(final SnippetDTO snippetDto, final Set<String> affectedComponentIds) {
        // if snippet does not exist, then the update request is likely creating it
        // so we don't verify since it will fail
        if (snippetDAO.hasSnippet(snippetDto.getId())) {
            snippetDAO.verifyUpdateSnippetComponent(snippetDto);
        }
    }

    @Override
    public SnippetEntity updateSnippet(final Set<Revision> revisions, final SnippetDTO snippetDto) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);

        final RevisionUpdate<SnippetDTO> snapshot;
        try {
            snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() {
                @Override
                public RevisionUpdate<SnippetDTO> update() {
                    // get the updated component
                    final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto);

                    // drop the snippet
                    snippetDAO.dropSnippet(snippet.getId());

                    // save updated controller
                    controllerFacade.save();

                    // increment the revisions
                    final Set<Revision> updatedRevisions = revisions.stream().map(revision -> {
                        final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
                        return currentRevision.incrementRevision(revision.getClientId());
                    }).collect(Collectors.toSet());

                    final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
                    return new StandardRevisionUpdate<>(dto, null, updatedRevisions);
                }
            });
        } catch (final ExpiredRevisionClaimException e) {
            throw new InvalidRevisionException("Failed to update Snippet", e);
        }

        return entityFactory.createSnippetEntity(snapshot.getComponent());
    }

    @Override
    public PortEntity updateInputPort(final Revision revision, final PortDTO inputPortDTO) {
        final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId());
        final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
                inputPortNode,
                () -> inputPortDAO.updatePort(inputPortDTO),
                port -> dtoFactory.createPortDto(port));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPortNode);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(inputPortNode));
        final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortNode.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(inputPortNode.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public PortEntity updateOutputPort(final Revision revision, final PortDTO outputPortDTO) {
        final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId());
        final RevisionUpdate<PortDTO> snapshot = updateComponent(revision,
                outputPortNode,
                () -> outputPortDAO.updatePort(outputPortDTO),
                port -> dtoFactory.createPortDto(port));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPortNode);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(outputPortNode), NiFiUserUtils.getNiFiUser());
        final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortNode.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(outputPortNode.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public RemoteProcessGroupEntity updateRemoteProcessGroup(final Revision revision, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
        final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
        final RevisionUpdate<RemoteProcessGroupDTO> snapshot = updateComponent(
                revision,
                remoteProcessGroupNode,
                () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO),
                remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
        final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
        final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroupNode,
                controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroupNode.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroupNode.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort(
            final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {

        final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
        final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
                revision,
                remoteProcessGroupNode,
                () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
                remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
        return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
    }

    @Override
    public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort(
            final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {

        final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId());
        final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent(
                revision,
                remoteProcessGroupNode,
                () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO),
                remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroupNode);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroupNode));
        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
        return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, permissions, operatePermissions);
    }

    @Override
    public Set<AffectedComponentDTO> getActiveComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
        if (group == null) {
            throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
        }

        final Map<String, String> variableMap = new HashMap<>();
        variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
            .map(VariableEntity::getVariable)
            .forEach(var -> variableMap.put(var.getName(), var.getValue()));

        final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>();

        final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
        for (final String variableName : updatedVariableNames) {
            final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);

            for (final ComponentNode component : affectedComponents) {
                if (component instanceof ProcessorNode) {
                    final ProcessorNode procNode = (ProcessorNode) component;
                    if (procNode.isRunning()) {
                        affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
                    }
                } else if (component instanceof ControllerServiceNode) {
                    final ControllerServiceNode serviceNode = (ControllerServiceNode) component;
                    if (serviceNode.isActive()) {
                        affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
                    }
                } else {
                    throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable");
                }
            }
        }

        return affectedComponentDtos;
    }

    @Override
    public Set<AffectedComponentEntity> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
        if (group == null) {
            throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
        }

        final Map<String, String> variableMap = new HashMap<>();
        variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
                .map(VariableEntity::getVariable)
                .forEach(var -> variableMap.put(var.getName(), var.getValue()));

        final Set<AffectedComponentEntity> affectedComponentEntities = new HashSet<>();

        final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
        for (final String variableName : updatedVariableNames) {
            final Set<ComponentNode> affectedComponents = group.getComponentsAffectedByVariable(variableName);
            affectedComponentEntities.addAll(dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager));
        }

        return affectedComponentEntities;
    }

    private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) {
        final Set<String> updatedVariableNames = new HashSet<>();

        final ComponentVariableRegistry registry = group.getVariableRegistry();
        for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
            final String varName = entry.getKey();
            final String newValue = entry.getValue();

            final String curValue = registry.getVariableValue(varName);
            if (!Objects.equals(newValue, curValue)) {
                updatedVariableNames.add(varName);
            }
        }

        return updatedVariableNames;
    }


    @Override
    public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) {
        final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
        final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(revision,
            processGroupNode,
            () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
            processGroup -> dtoFactory.createVariableRegistryDto(processGroup, revisionManager));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
        return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions);
    }

    public void verifyCreateParameterContext(final ParameterContextDTO parameterContextDto) {
        parameterContextDAO.verifyCreate(parameterContextDto);
    }

    @Override
    public void verifyUpdateParameterContext(final ParameterContextDTO parameterContext, final boolean verifyComponentStates) {
        parameterContextDAO.verifyUpdate(parameterContext, verifyComponentStates);
    }

    @Override
    public ParameterContextEntity updateParameterContext(final Revision revision, final ParameterContextDTO parameterContextDto) {
        // get the component, ensure we have access to it, and perform the update request
        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());
        final RevisionUpdate<ParameterContextDTO> snapshot = updateComponent(revision,
            parameterContext,
            () -> parameterContextDAO.updateParameterContext(parameterContextDto),
            context -> dtoFactory.createParameterContextDto(context, revisionManager, false, parameterContextDAO));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(parameterContext);
        final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(snapshot.getLastModification());
        return entityFactory.createParameterContextEntity(snapshot.getComponent(), revisionDto, permissions);

    }

    @Override
    public ParameterContextEntity getParameterContext(final String parameterContextId, final boolean includeInheritedParameters, final NiFiUser user) {
        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextId);
        return createParameterContextEntity(parameterContext, includeInheritedParameters, user, parameterContextDAO);
    }

    @Override
    public Set<ParameterContextEntity> getParameterContexts() {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        final Set<ParameterContextEntity> entities = parameterContextDAO.getParameterContexts().stream()
            .map(context -> createParameterContextEntity(context, false, user, parameterContextDAO))
            .collect(Collectors.toSet());

        return entities;
    }

    @Override
    public ParameterContext getParameterContextByName(final String parameterContextName, final NiFiUser user) {
        final ParameterContext parameterContext = parameterContextDAO.getParameterContexts().stream()
            .filter(context -> context.getName().equals(parameterContextName))
            .findAny()
            .orElse(null);

        if (parameterContext == null) {
            return null;
        }

        final boolean authorized = parameterContext.isAuthorized(authorizer, RequestAction.READ, user);
        if (!authorized) {
            // Note that we do not call ParameterContext.authorize() because doing so would result in an error message indicating that the user does not have permission
            // to READ Parameter Context with ID ABC123, which tells the user that the Parameter Context ABC123 has the same name as the requested name. Instead, we simply indicate
            // that the user is unable to read the Parameter Context and provide the name, rather than the ID, so that information about which ID corresponds to the given name is not provided.
            throw new AccessDeniedException("Unable to read Parameter Context with name '" + parameterContextName + "'.");
        }

        return parameterContext;
    }

    private ParameterContextEntity createParameterContextEntity(final ParameterContext parameterContext, final boolean includeInheritedParameters, final NiFiUser user,
                                                                final ParameterContextLookup parameterContextLookup) {
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(parameterContext, user);
        final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revisionManager.getRevision(parameterContext.getIdentifier()));
        final ParameterContextDTO parameterContextDto = dtoFactory.createParameterContextDto(parameterContext, revisionManager, includeInheritedParameters, parameterContextLookup);
        final ParameterContextEntity entity = entityFactory.createParameterContextEntity(parameterContextDto, revisionDto, permissions);
        return entity;
    }

    @Override
    public List<ComponentValidationResultEntity> validateComponents(final ParameterContextDTO parameterContextDto, final NiFiUser nifiUser) {
        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());
        final Set<ProcessGroup> boundProcessGroups = parameterContext.getParameterReferenceManager().getProcessGroupsBound(parameterContext);

        final ParameterContext updatedParameterContext = new StandardParameterContext(parameterContext.getIdentifier(), parameterContext.getName(),
                ParameterReferenceManager.EMPTY, null);
        final Map<String, Parameter> parameters = new HashMap<>();
        parameterContextDto.getParameters().stream()
            .map(ParameterEntity::getParameter)
            .map(this::createParameter)
            .forEach(param -> parameters.put(param.getDescriptor().getName(), param));
        updatedParameterContext.setParameters(parameters);

        final List<ComponentValidationResultEntity> validationResults = new ArrayList<>();
        for (final ProcessGroup processGroup : boundProcessGroups) {
            for (final ProcessorNode processorNode : processGroup.getProcessors()) {
                if (!processorNode.isReferencingParameter()) {
                    continue;
                }

                final ComponentValidationResultEntity componentValidationResultEntity = validateComponent(processorNode, updatedParameterContext, nifiUser);
                validationResults.add(componentValidationResultEntity);
            }

            for (final ControllerServiceNode serviceNode : processGroup.getControllerServices(false)) {
                if (!serviceNode.isReferencingParameter()) {
                    continue;
                }

                final ComponentValidationResultEntity componentValidationResultEntity = validateComponent(serviceNode, updatedParameterContext, nifiUser);
                validationResults.add(componentValidationResultEntity);
            }
        }

        return validationResults;
    }

    private ComponentValidationResultEntity validateComponent(final ComponentNode componentNode, final ParameterContext parameterContext, final NiFiUser user) {
        final ValidationState newState = componentNode.performValidation(componentNode.getProperties(), componentNode.getAnnotationData(), parameterContext);
        final ComponentValidationResultDTO resultDto = dtoFactory.createComponentValidationResultDto(componentNode, newState);

        final Revision revision = revisionManager.getRevision(componentNode.getIdentifier());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(componentNode, user);
        final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision);

        final ComponentValidationResultEntity componentValidationResultEntity = entityFactory.createComponentValidationResultEntity(resultDto, revisionDto, permissions);
        return componentValidationResultEntity;
    }

    private Parameter createParameter(final ParameterDTO dto) {
        if (dto.getDescription() == null && dto.getSensitive() == null && dto.getValue() == null) {
            return null; // null description, sensitivity flag, and value indicates a deletion, which we want to represent as a null Parameter.
        }

        final ParameterDescriptor descriptor = new ParameterDescriptor.Builder()
            .name(dto.getName())
            .description(dto.getDescription())
            .sensitive(Boolean.TRUE.equals(dto.getSensitive()))
            .build();

        return new Parameter(descriptor, dto.getValue());
    }

    @Override
    public ParameterContextEntity createParameterContext(final Revision revision, final ParameterContextDTO parameterContextDto) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        // request claim for component to be created... revision already verified (version == 0)
        final RevisionClaim claim = new StandardRevisionClaim(revision);

        // update revision through revision manager
        final RevisionUpdate<ParameterContextDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
            // create the reporting task
            final ParameterContext parameterContext = parameterContextDAO.createParameterContext(parameterContextDto);

            // save the update
            controllerFacade.save();

            final ParameterContextDTO dto = dtoFactory.createParameterContextDto(parameterContext, revisionManager, false, parameterContextDAO);
            final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
            return new StandardRevisionUpdate<>(dto, lastMod);
        });

        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(parameterContext);

        return entityFactory.createParameterContextEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    @Override
    public void verifyDeleteParameterContext(final String parameterContextId) {
        parameterContextDAO.verifyDelete(parameterContextId);
    }

    @Override
    public ParameterContextEntity deleteParameterContext(final Revision revision, final String parameterContextId) {
        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(parameterContext);
        final ParameterContextDTO snapshot = deleteComponent(
            revision,
            parameterContext.getResource(),
            () -> parameterContextDAO.deleteParameterContext(parameterContextId),
            true,
            dtoFactory.createParameterContextDto(parameterContext, revisionManager, false, parameterContextDAO));

        return entityFactory.createParameterContextEntity(snapshot, null, permissions);

    }


    @Override
    public Set<AffectedComponentEntity> getProcessorsReferencingParameter(final String groupId) {
        return getComponentsReferencingParameter(groupId, ProcessGroup::getProcessors);
    }

    @Override
    public Set<AffectedComponentEntity> getControllerServicesReferencingParameter(String groupId) {
        return getComponentsReferencingParameter(groupId, group -> group.getControllerServices(false));
    }

    private Set<AffectedComponentEntity> getComponentsReferencingParameter(final String groupId, final Function<ProcessGroup, Collection<? extends ComponentNode>> componentFunction) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        final Set<ComponentNode> affectedComponents = new HashSet<>();

        componentFunction.apply(group).stream()
            .filter(ComponentNode::isReferencingParameter)
            .forEach(affectedComponents::add);

        return dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager);
    }

    @Override
    public AffectedComponentEntity getUpdatedAffectedComponentEntity(final AffectedComponentEntity affectedComponent) {
        final AffectedComponentDTO dto = affectedComponent.getComponent();
        if (dto == null) {
            return affectedComponent;
        }

        final String groupId = affectedComponent.getProcessGroup().getId();
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);

        final String componentType = dto.getReferenceType();
        if (AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(componentType)) {
            final ControllerServiceNode serviceNode = processGroup.getControllerService(dto.getId());
            return dtoFactory.createAffectedComponentEntity(serviceNode, revisionManager);
        } else if (AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR.equals(componentType)) {
            final ProcessorNode processorNode = processGroup.getProcessor(dto.getId());
            return dtoFactory.createAffectedComponentEntity(processorNode, revisionManager);
        } else if (AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT.equals(componentType)) {
            final Port inputPort = processGroup.getInputPort(dto.getId());
            final PortEntity portEntity = createInputPortEntity(inputPort);
            return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
        } else if (AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT.equals(componentType)) {
            final Port outputPort = processGroup.getOutputPort(dto.getId());
            final PortEntity portEntity = createOutputPortEntity(outputPort);
            return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
        } else if (AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(componentType)) {
            final RemoteGroupPort remoteGroupPort = processGroup.findRemoteGroupPort(dto.getId());
            final RemoteProcessGroupEntity rpgEntity = createRemoteGroupEntity(remoteGroupPort.getRemoteProcessGroup(), NiFiUserUtils.getNiFiUser());
            final RemoteProcessGroupPortDTO remotePortDto = dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
            return dtoFactory.createAffectedComponentEntity(remotePortDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT, rpgEntity);
        } else if (AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(componentType)) {
            final RemoteGroupPort remoteGroupPort = processGroup.findRemoteGroupPort(dto.getId());
            final RemoteProcessGroupEntity rpgEntity = createRemoteGroupEntity(remoteGroupPort.getRemoteProcessGroup(), NiFiUserUtils.getNiFiUser());
            final RemoteProcessGroupPortDTO remotePortDto = dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
            return dtoFactory.createAffectedComponentEntity(remotePortDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT, rpgEntity);
        }

        return affectedComponent;
    }

    @Override
    public Set<AffectedComponentEntity> getComponentsAffectedByParameterContextUpdate(final ParameterContextDTO parameterContextDto) {
        return getComponentsAffectedByParameterContextUpdate(parameterContextDto, true);
    }

    private Set<AffectedComponentEntity> getComponentsAffectedByParameterContextUpdate(final ParameterContextDTO parameterContextDto, final boolean includeInactive) {
        final ProcessGroup rootGroup = processGroupDAO.getProcessGroup("root");
        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());
        final List<ProcessGroup> groupsReferencingParameterContext = rootGroup.findAllProcessGroups(
            group -> group.getParameterContext() != null && (group.getParameterContext().getIdentifier().equals(parameterContextDto.getId())
                    || group.getParameterContext().inheritsFrom(parameterContext.getIdentifier())));

        final Set<String> updatedParameterNames = getUpdatedParameterNames(parameterContextDto);

        // Clear set of Affected Components for each Parameter. This parameter is read-only and it will be populated below.
        for (final ParameterEntity parameterEntity : parameterContextDto.getParameters()) {
            parameterEntity.getParameter().setReferencingComponents(new HashSet<>());
        }

        final Set<ComponentNode> affectedComponents = new HashSet<>();
        for (final ProcessGroup group : groupsReferencingParameterContext) {
            for (final ProcessorNode processor : group.getProcessors()) {
                if (includeInactive || processor.isRunning()) {
                    final Set<String> referencedParams = processor.getReferencedParameterNames();
                    final boolean referencesUpdatedParam = referencedParams.stream().anyMatch(updatedParameterNames::contains);

                    if (referencesUpdatedParam) {
                        affectedComponents.add(processor);

                        final AffectedComponentEntity affectedComponentEntity = dtoFactory.createAffectedComponentEntity(processor, revisionManager);

                        for (final String referencedParam : referencedParams) {
                            for (final ParameterEntity paramEntity : parameterContextDto.getParameters()) {
                                final ParameterDTO paramDto = paramEntity.getParameter();
                                if (referencedParam.equals(paramDto.getName())) {
                                    paramDto.getReferencingComponents().add(affectedComponentEntity);
                                }
                            }
                        }
                    }
                }
            }

            for (final ControllerServiceNode service : group.getControllerServices(false)) {
                if (includeInactive || service.isActive()) {
                    final Set<String> referencedParams = service.getReferencedParameterNames();
                    final Set<String> updatedReferencedParams = referencedParams.stream().filter(updatedParameterNames::contains).collect(Collectors.toSet());

                    final List<ParameterDTO> affectedParameterDtos = new ArrayList<>();
                    for (final String referencedParam : referencedParams) {
                        for (final ParameterEntity paramEntity : parameterContextDto.getParameters()) {
                            final ParameterDTO paramDto = paramEntity.getParameter();
                            if (referencedParam.equals(paramDto.getName())) {
                                affectedParameterDtos.add(paramDto);
                            }
                        }
                    }

                    if (!updatedReferencedParams.isEmpty()) {
                        addReferencingComponents(service, affectedComponents, affectedParameterDtos, includeInactive);
                    }
                }
            }
        }

        return dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager);
    }

    private void addReferencingComponents(final ControllerServiceNode service, final Set<ComponentNode> affectedComponents, final List<ParameterDTO> affectedParameterDtos,
                                          final boolean includeInactive) {

        // We keep a mapping of Affected Components for the Parameter Context Update as well as a set of all Affected Components for each updated Parameter.
        // We must update both of these.
        affectedComponents.add(service);

        // Update Parameter DTO to also reflect the Affected Component.
        final AffectedComponentEntity affectedComponentEntity = dtoFactory.createAffectedComponentEntity(service, revisionManager);
        affectedParameterDtos.forEach(dto -> dto.getReferencingComponents().add(affectedComponentEntity));

        for (final ComponentNode referencingComponent : service.getReferences().getReferencingComponents()) {
            if (includeInactive || isActive(referencingComponent)) {
                // We must update both the Set of Affected Components as well as the Affected Components for the referenced parameter.
                affectedComponents.add(referencingComponent);

                final AffectedComponentEntity referencingComponentEntity = dtoFactory.createAffectedComponentEntity(referencingComponent, revisionManager);
                affectedParameterDtos.forEach(dto -> dto.getReferencingComponents().add(referencingComponentEntity));

                if (referencingComponent instanceof ControllerServiceNode) {
                    addReferencingComponents((ControllerServiceNode) referencingComponent, affectedComponents, affectedParameterDtos, includeInactive);
                }
            }
        }
    }

    private boolean isActive(final ComponentNode componentNode) {
        if (componentNode instanceof ControllerServiceNode) {
            return ((ControllerServiceNode) componentNode).isActive();
        }

        if (componentNode instanceof ProcessorNode) {
            return ((ProcessorNode) componentNode).isRunning();
        }

        return false;
    }

    private Set<String> getUpdatedParameterNames(final ParameterContextDTO parameterContextDto) {
        final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextDto.getId());

        final Set<String> updatedParameters = new HashSet<>();
        for (final ParameterEntity parameterEntity : parameterContextDto.getParameters()) {
            final ParameterDTO parameterDto = parameterEntity.getParameter();
            final String updatedValue = parameterDto.getValue();
            final String parameterName = parameterDto.getName();

            final Optional<Parameter> parameterOption = parameterContext.getParameter(parameterName);
            if (!parameterOption.isPresent()) {
                updatedParameters.add(parameterName);
                continue;
            }

            final Parameter parameter = parameterOption.get();
            final boolean valueUpdated = !Objects.equals(updatedValue, parameter.getValue());
            final boolean descriptionUpdated = parameterDto.getDescription() != null && !parameterDto.getDescription().equals(parameter.getDescriptor().getDescription());
            final boolean updated = valueUpdated || descriptionUpdated;
            if (updated) {
                updatedParameters.add(parameterName);
            }
        }

        return updatedParameters;
    }


    @Override
    public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) {
        final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
        final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
                processGroupNode,
                () -> processGroupDAO.updateProcessGroup(processGroupDTO),
                processGroup -> dtoFactory.createProcessGroupDto(processGroup));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
        final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
    }

    @Override
    public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
        if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
            processGroupDAO.verifyUpdate(processGroupDTO);
        }
    }

    @Override
    public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
                UpdateRevisionTask<ScheduleComponentsEntity>() {
                    @Override
                    public RevisionUpdate<ScheduleComponentsEntity> update() {
                        // schedule the components
                        processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet());

                        // update the revisions
                        final Map<String, Revision> updatedRevisions = new HashMap<>();
                        for (final Revision revision : componentRevisions.values()) {
                            final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
                            updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
                        }

                        // save
                        controllerFacade.save();

                        // gather details for response
                        final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
                        entity.setId(processGroupId);
                        entity.setState(state.name());
                        return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
                    }
                });

        return updatedComponent.getComponent();
    }

    @Override
    public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
                UpdateRevisionTask<ScheduleComponentsEntity>() {
                    @Override
                    public RevisionUpdate<ScheduleComponentsEntity> update() {
                        // schedule the components
                        processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());

                        // update the revisions
                        final Map<String, Revision> updatedRevisions = new HashMap<>();
                        for (final Revision revision : componentRevisions.values()) {
                            final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
                            updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
                        }

                        // save
                        controllerFacade.save();

                        // gather details for response
                        final ScheduleComponentsEntity entity = new ScheduleComponentsEntity();
                        entity.setId(processGroupId);
                        entity.setState(state.name());
                        return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
                    }
                });

        return updatedComponent.getComponent();
    }

    @Override
    public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user,
            new UpdateRevisionTask<ActivateControllerServicesEntity>() {
                @Override
                public RevisionUpdate<ActivateControllerServicesEntity> update() {
                    // schedule the components
                    processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());

                    // update the revisions
                    final Map<String, Revision> updatedRevisions = new HashMap<>();
                    for (final Revision revision : serviceRevisions.values()) {
                        final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
                        updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
                    }

                    // save
                    controllerFacade.save();

                    // gather details for response
                    final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity();
                    entity.setId(processGroupId);
                    entity.setState(state.name());
                    return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
                }
            });

        return updatedComponent.getComponent();
    }


    @Override
    public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
        final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent(
                revision,
                controllerFacade,
                () -> {
                    if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
                        controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
                    }
                    if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
                        controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
                    }

                    return controllerConfigurationDTO;
                },
                controller -> dtoFactory.createControllerConfigurationDto(controllerFacade));

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
        final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(updatedComponent.getLastModification());
        return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions);
    }


    @Override
    public NodeDTO updateNode(final NodeDTO nodeDTO) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        if (user == null) {
            throw new WebApplicationException(new Throwable("Unable to access details for current user."));
        }
        final String userDn = user.getIdentity();

        final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(nodeDTO.getNodeId());
        if (nodeId == null) {
            throw new UnknownNodeException("No node exists with ID " + nodeDTO.getNodeId());
        }


        if (NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
            clusterCoordinator.requestNodeConnect(nodeId, userDn);
        } else if (NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
            clusterCoordinator.requestNodeOffload(nodeId, OffloadCode.OFFLOADED,
                    "User " + userDn + " requested that node be offloaded");
        } else if (NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
            clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED,
                    "User " + userDn + " requested that node be disconnected from cluster");
        }

        return getNode(nodeId);
    }

    @Override
    public CounterDTO updateCounter(final String counterId) {
        return dtoFactory.createCounterDto(controllerFacade.resetCounter(counterId));
    }

    @Override
    public void verifyCanClearProcessorState(final String processorId) {
        processorDAO.verifyClearState(processorId);
    }

    @Override
    public void clearProcessorState(final String processorId) {
        processorDAO.clearState(processorId);
    }

    @Override
    public void verifyCanClearControllerServiceState(final String controllerServiceId) {
        controllerServiceDAO.verifyClearState(controllerServiceId);
    }

    @Override
    public void clearControllerServiceState(final String controllerServiceId) {
        controllerServiceDAO.clearState(controllerServiceId);
    }

    @Override
    public void verifyCanClearReportingTaskState(final String reportingTaskId) {
        reportingTaskDAO.verifyClearState(reportingTaskId);
    }

    @Override
    public void clearReportingTaskState(final String reportingTaskId) {
        reportingTaskDAO.clearState(reportingTaskId);
    }

    @Override
    public ComponentStateDTO getRemoteProcessGroupState(String remoteProcessGroupId) {
        final StateMap clusterState = isClustered() ? remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.CLUSTER) : null;
        final StateMap localState = remoteProcessGroupDAO.getState(remoteProcessGroupId, Scope.LOCAL);

        // processor will be non null as it was already found when getting the state
        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
        return dtoFactory.createComponentStateDTO(remoteProcessGroupId, remoteProcessGroup.getClass(), localState, clusterState);
    }

    @Override
    public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
        final ConnectionDTO snapshot = deleteComponent(
                revision,
                connection.getResource(),
                () -> connectionDAO.deleteConnection(connectionId),
                false, // no policies to remove
                dtoFactory.createConnectionDto(connection));

        return entityFactory.createConnectionEntity(snapshot, null, permissions, null);
    }

    @Override
    public DropRequestDTO deleteFlowFileDropRequest(final String connectionId, final String dropRequestId) {
        return dtoFactory.createDropRequestDTO(connectionDAO.deleteFlowFileDropRequest(connectionId, dropRequestId));
    }

    @Override
    public ListingRequestDTO deleteFlowFileListingRequest(final String connectionId, final String listingRequestId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(connectionId, listingRequestId));

        // include whether the source and destination are running
        if (connection.getSource() != null) {
            listRequest.setSourceRunning(connection.getSource().isRunning());
        }
        if (connection.getDestination() != null) {
            listRequest.setDestinationRunning(connection.getDestination().isRunning());
        }

        return listRequest;
    }

    @Override
    public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) {
        final ProcessorNode processor = processorDAO.getProcessor(processorId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
        final ProcessorDTO snapshot = deleteComponent(
                revision,
                processor.getResource(),
                () -> processorDAO.deleteProcessor(processorId),
                true,
                dtoFactory.createProcessorDto(processor));

        return entityFactory.createProcessorEntity(snapshot, null, permissions, operatePermissions, null, null);
    }

    @Override
    public ProcessorEntity terminateProcessor(final String processorId) {
        processorDAO.terminate(processorId);
        return getProcessor(processorId);
    }

    @Override
    public void verifyTerminateProcessor(final String processorId) {
        processorDAO.verifyTerminate(processorId);
    }

    @Override
    public LabelEntity deleteLabel(final Revision revision, final String labelId) {
        final Label label = labelDAO.getLabel(labelId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
        final LabelDTO snapshot = deleteComponent(
                revision,
                label.getResource(),
                () -> labelDAO.deleteLabel(labelId),
                true,
                dtoFactory.createLabelDto(label));

        return entityFactory.createLabelEntity(snapshot, null, permissions);
    }

    @Override
    public UserEntity deleteUser(final Revision revision, final String userId) {
        final User user = userDAO.getUser(userId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
        final Set<TenantEntity> userGroups = user != null ? userGroupDAO.getUserGroupsForUser(userId).stream()
                .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
        final Set<AccessPolicySummaryEntity> policyEntities = user != null ? userGroupDAO.getAccessPoliciesForUser(userId).stream()
                .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet()) : null;

        final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userId;
        final UserDTO snapshot = deleteComponent(
                revision,
                new Resource() {
                    @Override
                    public String getIdentifier() {
                        return resourceIdentifier;
                    }

                    @Override
                    public String getName() {
                        return resourceIdentifier;
                    }

                    @Override
                    public String getSafeDescription() {
                        return "User " + userId;
                    }
                },
                () -> userDAO.deleteUser(userId),
                false, // no user specific policies to remove
                dtoFactory.createUserDto(user, userGroups, policyEntities));

        return entityFactory.createUserEntity(snapshot, null, permissions);
    }

    @Override
    public UserGroupEntity deleteUserGroup(final Revision revision, final String userGroupId) {
        final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
        final Set<TenantEntity> users = userGroup != null ? userGroup.getUsers().stream()
                .map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
        final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
                .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());

        final String resourceIdentifier = ResourceFactory.getTenantResource().getIdentifier() + "/" + userGroupId;
        final UserGroupDTO snapshot = deleteComponent(
                revision,
                new Resource() {
                    @Override
                    public String getIdentifier() {
                        return resourceIdentifier;
                    }

                    @Override
                    public String getName() {
                        return resourceIdentifier;
                    }

                    @Override
                    public String getSafeDescription() {
                        return "User Group " + userGroupId;
                    }
                },
                () -> userGroupDAO.deleteUserGroup(userGroupId),
                false, // no user group specific policies to remove
                dtoFactory.createUserGroupDto(userGroup, users, policyEntities));

        return entityFactory.createUserGroupEntity(snapshot, null, permissions);
    }

    @Override
    public AccessPolicyEntity deleteAccessPolicy(final Revision revision, final String accessPolicyId) {
        final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
        final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyId));
        final Set<TenantEntity> userGroups = accessPolicy != null ? accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
        final Set<TenantEntity> users = accessPolicy != null ? accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()) : null;
        final AccessPolicyDTO snapshot = deleteComponent(
                revision,
                new Resource() {
                    @Override
                    public String getIdentifier() {
                        return accessPolicy.getResource();
                    }

                    @Override
                    public String getName() {
                        return accessPolicy.getResource();
                    }

                    @Override
                    public String getSafeDescription() {
                        return "Policy " + accessPolicyId;
                    }
                },
                () -> accessPolicyDAO.deleteAccessPolicy(accessPolicyId),
                false, // no need to clean up any policies as it's already been removed above
                dtoFactory.createAccessPolicyDto(accessPolicy, userGroups, users, componentReference));

        return entityFactory.createAccessPolicyEntity(snapshot, null, permissions);
    }

    @Override
    public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) {
        final Funnel funnel = funnelDAO.getFunnel(funnelId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
        final FunnelDTO snapshot = deleteComponent(
                revision,
                funnel.getResource(),
                () -> funnelDAO.deleteFunnel(funnelId),
                true,
                dtoFactory.createFunnelDto(funnel));

        return entityFactory.createFunnelEntity(snapshot, null, permissions);
    }

    /**
     * Deletes a component using the Optimistic Locking Manager
     *
     * @param revision     the current revision
     * @param resource the resource being removed
     * @param deleteAction the action that deletes the component via the appropriate DAO object
     * @param cleanUpPolicies whether or not the policies for this resource should be removed as well - not necessary when there are
     *                        no component specific policies or if the policies of the component are inherited
     * @return a dto that represents the new configuration
     */
    private <D, C> D deleteComponent(final Revision revision, final Resource resource, final Runnable deleteAction, final boolean cleanUpPolicies, final D dto) {
        final RevisionClaim claim = new StandardRevisionClaim(revision);
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        return revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<D>() {
            @Override
            public D performTask() {
                logger.debug("Attempting to delete component {} with claim {}", resource.getIdentifier(), claim);

                // run the delete action
                deleteAction.run();

                // save the flow
                controllerFacade.save();
                logger.debug("Deletion of component {} was successful", resource.getIdentifier());

                if (cleanUpPolicies) {
                    cleanUpPolicies(resource);
                }

                return dto;
            }
        });
    }

    /**
     * Clean up the policies for the specified component resource.
     *
     * @param componentResource the resource for the component
     */
    private void cleanUpPolicies(final Resource componentResource) {
        // ensure the authorizer supports configuration
        if (accessPolicyDAO.supportsConfigurableAuthorizer()) {
            final List<Resource> resources = new ArrayList<>();
            resources.add(componentResource);
            resources.add(ResourceFactory.getDataResource(componentResource));
            resources.add(ResourceFactory.getProvenanceDataResource(componentResource));
            resources.add(ResourceFactory.getDataTransferResource(componentResource));
            resources.add(ResourceFactory.getPolicyResource(componentResource));
            resources.add(ResourceFactory.getOperationResource(componentResource));

            for (final Resource resource : resources) {
                for (final RequestAction action : RequestAction.values()) {
                    try {
                        // since the component is being deleted, also delete any relevant access policies
                        final AccessPolicy readPolicy = accessPolicyDAO.getAccessPolicy(action, resource.getIdentifier());
                        if (readPolicy != null) {
                            accessPolicyDAO.deleteAccessPolicy(readPolicy.getIdentifier());
                        }
                    } catch (final Exception e) {
                        logger.warn(String.format("Unable to remove access policy for %s %s after component removal.", action, resource.getIdentifier()), e);
                    }
                }
            }
        }
    }

    @Override
    public void verifyDeleteSnippet(final String snippetId, final Set<String> affectedComponentIds) {
        snippetDAO.verifyDeleteSnippetComponents(snippetId);
    }

    @Override
    public SnippetEntity deleteSnippet(final Set<Revision> revisions, final String snippetId) {
        final Snippet snippet = snippetDAO.getSnippet(snippetId);

        // grab the resources in the snippet so we can delete the policies afterwards
        final Set<Resource> snippetResources = new HashSet<>();
        snippet.getProcessors().keySet().forEach(id -> snippetResources.add(processorDAO.getProcessor(id).getResource()));
        snippet.getInputPorts().keySet().forEach(id -> snippetResources.add(inputPortDAO.getPort(id).getResource()));
        snippet.getOutputPorts().keySet().forEach(id -> snippetResources.add(outputPortDAO.getPort(id).getResource()));
        snippet.getFunnels().keySet().forEach(id -> snippetResources.add(funnelDAO.getFunnel(id).getResource()));
        snippet.getLabels().keySet().forEach(id -> snippetResources.add(labelDAO.getLabel(id).getResource()));
        snippet.getRemoteProcessGroups().keySet().forEach(id -> snippetResources.add(remoteProcessGroupDAO.getRemoteProcessGroup(id).getResource()));
        snippet.getProcessGroups().keySet().forEach(id -> {
            final ProcessGroup processGroup = processGroupDAO.getProcessGroup(id);

            // add the process group
            snippetResources.add(processGroup.getResource());

            // add each encapsulated component
            processGroup.findAllProcessors().forEach(processor -> snippetResources.add(processor.getResource()));
            processGroup.findAllInputPorts().forEach(inputPort -> snippetResources.add(inputPort.getResource()));
            processGroup.findAllOutputPorts().forEach(outputPort -> snippetResources.add(outputPort.getResource()));
            processGroup.findAllFunnels().forEach(funnel -> snippetResources.add(funnel.getResource()));
            processGroup.findAllLabels().forEach(label -> snippetResources.add(label.getResource()));
            processGroup.findAllProcessGroups().forEach(childGroup -> snippetResources.add(childGroup.getResource()));
            processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> snippetResources.add(remoteProcessGroup.getResource()));
            processGroup.findAllTemplates().forEach(template -> snippetResources.add(template.getResource()));
            processGroup.findAllControllerServices().forEach(controllerService -> snippetResources.add(controllerService.getResource()));
        });

        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final RevisionClaim claim = new StandardRevisionClaim(revisions);
        final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<SnippetDTO>() {
            @Override
            public SnippetDTO performTask() {
                // delete the components in the snippet
                snippetDAO.deleteSnippetComponents(snippetId);

                // drop the snippet
                snippetDAO.dropSnippet(snippetId);

                // save
                controllerFacade.save();

                // create the dto for the snippet that was just removed
                return dtoFactory.createSnippetDto(snippet);
            }
        });

        // clean up component policies
        snippetResources.forEach(resource -> cleanUpPolicies(resource));

        return entityFactory.createSnippetEntity(dto);
    }

    @Override
    public PortEntity deleteInputPort(final Revision revision, final String inputPortId) {
        final Port port = inputPortDAO.getPort(inputPortId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
        final PortDTO snapshot = deleteComponent(
                revision,
                port.getResource(),
                () -> inputPortDAO.deletePort(inputPortId),
                true,
                dtoFactory.createPortDto(port));

        return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
    }

    @Override
    public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) {
        final Port port = outputPortDAO.getPort(outputPortId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
        final PortDTO snapshot = deleteComponent(
                revision,
                port.getResource(),
                () -> outputPortDAO.deletePort(outputPortId),
                true,
                dtoFactory.createPortDto(port));

        return entityFactory.createPortEntity(snapshot, null, permissions, operatePermissions, null, null);
    }

    @Override
    public DropRequestDTO createDropAllFlowFilesInProcessGroup(final String processGroupId, final String dropRequestId) {
        return dtoFactory.createDropRequestDTO(processGroupDAO.createDropAllFlowFilesRequest(processGroupId, dropRequestId));
    }

    @Override
    public DropRequestDTO getDropAllFlowFilesRequest(final String processGroupId, final String dropRequestId) {
        return dtoFactory.createDropRequestDTO(processGroupDAO.getDropAllFlowFilesRequest(processGroupId, dropRequestId));
    }

    @Override
    public DropRequestDTO deleteDropAllFlowFilesRequest(String processGroupId, String dropRequestId) {
        return dtoFactory.createDropRequestDTO(processGroupDAO.deleteDropAllFlowFilesRequest(processGroupId, dropRequestId));
    }

    @Override
    public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);

        // grab the resources in the snippet so we can delete the policies afterwards
        final Set<Resource> groupResources = new HashSet<>();
        processGroup.findAllProcessors().forEach(processor -> groupResources.add(processor.getResource()));
        processGroup.findAllInputPorts().forEach(inputPort -> groupResources.add(inputPort.getResource()));
        processGroup.findAllOutputPorts().forEach(outputPort -> groupResources.add(outputPort.getResource()));
        processGroup.findAllFunnels().forEach(funnel -> groupResources.add(funnel.getResource()));
        processGroup.findAllLabels().forEach(label -> groupResources.add(label.getResource()));
        processGroup.findAllProcessGroups().forEach(childGroup -> groupResources.add(childGroup.getResource()));
        processGroup.findAllRemoteProcessGroups().forEach(remoteProcessGroup -> groupResources.add(remoteProcessGroup.getResource()));
        processGroup.findAllTemplates().forEach(template -> groupResources.add(template.getResource()));
        processGroup.findAllControllerServices().forEach(controllerService -> groupResources.add(controllerService.getResource()));

        final ProcessGroupDTO snapshot = deleteComponent(
                revision,
                processGroup.getResource(),
                () -> processGroupDAO.deleteProcessGroup(groupId),
                true,
                dtoFactory.createProcessGroupDto(processGroup));

        // delete all applicable component policies
        groupResources.forEach(groupResource -> cleanUpPolicies(groupResource));

        return entityFactory.createProcessGroupEntity(snapshot, null, permissions, null, null);
    }

    @Override
    public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) {
        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
        final RemoteProcessGroupDTO snapshot = deleteComponent(
                revision,
                remoteProcessGroup.getResource(),
                () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId),
                true,
                dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));

        return entityFactory.createRemoteProcessGroupEntity(snapshot, null, permissions, operatePermissions, null, null);
    }

    @Override
    public void deleteTemplate(final String id) {
        // delete the template and save the flow
        templateDAO.deleteTemplate(id);
        controllerFacade.save();
    }

    @Override
    public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) {
        final RevisionUpdate<ConnectionDTO> snapshot = createComponent(
                revision,
                connectionDTO,
                () -> connectionDAO.createConnection(groupId, connectionDTO),
                connection -> dtoFactory.createConnectionDto(connection));

        final Connection connection = connectionDAO.getConnection(connectionDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
        final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionDTO.getId()));
        return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status);
    }

    @Override
    public DropRequestDTO createFlowFileDropRequest(final String connectionId, final String dropRequestId) {
        return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(connectionId, dropRequestId));
    }

    @Override
    public ListingRequestDTO createFlowFileListingRequest(final String connectionId, final String listingRequestId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(connectionId, listingRequestId));

        // include whether the source and destination are running
        if (connection.getSource() != null) {
            listRequest.setSourceRunning(connection.getSource().isRunning());
        }
        if (connection.getDestination() != null) {
            listRequest.setDestinationRunning(connection.getDestination().isRunning());
        }

        return listRequest;
    }

    @Override
    public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
        final RevisionUpdate<ProcessorDTO> snapshot = createComponent(
                revision,
                processorDTO,
                () -> processorDAO.createProcessor(groupId, processorDTO),
                processor -> {
                    awaitValidationCompletion(processor);
                    return dtoFactory.createProcessorDto(processor);
                });

        final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
        final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processorDTO.getId()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processorDTO.getId()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) {
        final RevisionUpdate<LabelDTO> snapshot = createComponent(
                revision,
                labelDTO,
                () -> labelDAO.createLabel(groupId, labelDTO),
                label -> dtoFactory.createLabelDto(label));

        final Label label = labelDAO.getLabel(labelDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
        return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    /**
     * Creates a component using the optimistic locking manager.
     *
     * @param componentDto the DTO that will be used to create the component
     * @param daoCreation  A Supplier that will create the NiFi Component to use
     * @param dtoCreation  a Function that will convert the NiFi Component into a corresponding DTO
     * @param <D>          the DTO Type
     * @param <C>          the NiFi Component Type
     * @return a RevisionUpdate that represents the updated configuration
     */
    private <D, C> RevisionUpdate<D> createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        // read lock on the containing group
        // request claim for component to be created... revision already verified (version == 0)
        final RevisionClaim claim = new StandardRevisionClaim(revision);

        // update revision through revision manager
        return revisionManager.updateRevision(claim, user, () -> {
            // add the component
            final C component = daoCreation.get();

            // save the flow
            controllerFacade.save();

            final D dto = dtoCreation.apply(component);
            final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
            return new StandardRevisionUpdate<>(dto, lastMod);
        });
    }

    @Override
    public BulletinEntity createBulletin(final BulletinDTO bulletinDTO, final Boolean canRead){
        final Bulletin bulletin = BulletinFactory.createBulletin(bulletinDTO.getCategory(),bulletinDTO.getLevel(),bulletinDTO.getMessage());
        bulletinRepository.addBulletin(bulletin);
        return entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin),canRead);
    }

    @Override
    public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) {
        final RevisionUpdate<FunnelDTO> snapshot = createComponent(
                revision,
                funnelDTO,
                () -> funnelDAO.createFunnel(groupId, funnelDTO),
                funnel -> dtoFactory.createFunnelDto(funnel));

        final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
        return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions);
    }

    @Override
    public AccessPolicyEntity createAccessPolicy(final Revision revision, final AccessPolicyDTO accessPolicyDTO) {
        final Authorizable tenantAuthorizable = authorizableLookup.getTenant();
        final String creator = NiFiUserUtils.getNiFiUserIdentity();

        final AccessPolicy newAccessPolicy = accessPolicyDAO.createAccessPolicy(accessPolicyDTO);
        final ComponentReferenceEntity componentReference = createComponentReferenceEntity(newAccessPolicy.getResource());
        final AccessPolicyDTO newAccessPolicyDto = dtoFactory.createAccessPolicyDto(newAccessPolicy,
                newAccessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
                newAccessPolicy.getUsers().stream().map(userId -> {
                    final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));
                    return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(userDAO.getUser(userId)), userRevision,
                            dtoFactory.createPermissionsDto(tenantAuthorizable));
                }).collect(Collectors.toSet()), componentReference);

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicyDTO.getId()));
        return entityFactory.createAccessPolicyEntity(newAccessPolicyDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
    }

    @Override
    public UserEntity createUser(final Revision revision, final UserDTO userDTO) {
        final String creator = NiFiUserUtils.getNiFiUserIdentity();
        final User newUser = userDAO.createUser(userDTO);
        final Set<TenantEntity> tenantEntities = userGroupDAO.getUserGroupsForUser(newUser.getIdentifier()).stream()
                .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet());
        final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(newUser.getIdentifier()).stream()
                .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
        final UserDTO newUserDto = dtoFactory.createUserDto(newUser, tenantEntities, policyEntities);

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
        return entityFactory.createUserEntity(newUserDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
    }

    private ComponentReferenceEntity createComponentReferenceEntity(final String resource) {
        ComponentReferenceEntity componentReferenceEntity = null;
        try {
            // get the component authorizable
            Authorizable componentAuthorizable = authorizableLookup.getAuthorizableFromResource(resource);

            // if this represents an authorizable whose policy permissions are enforced through the base resource,
            // get the underlying base authorizable for the component reference
            if (componentAuthorizable instanceof EnforcePolicyPermissionsThroughBaseResource) {
                componentAuthorizable = ((EnforcePolicyPermissionsThroughBaseResource) componentAuthorizable).getBaseAuthorizable();
            }

            final ComponentReferenceDTO componentReference = dtoFactory.createComponentReferenceDto(componentAuthorizable);
            if (componentReference != null) {
                final PermissionsDTO componentReferencePermissions = dtoFactory.createPermissionsDto(componentAuthorizable);
                final RevisionDTO componentReferenceRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(componentReference.getId()));
                componentReferenceEntity = entityFactory.createComponentReferenceEntity(componentReference, componentReferenceRevision, componentReferencePermissions);
            }
        } catch (final ResourceNotFoundException e) {
            // component not found for the specified resource
        }

        return componentReferenceEntity;
    }

    private AccessPolicySummaryEntity createAccessPolicySummaryEntity(final AccessPolicy ap) {
        final ComponentReferenceEntity componentReference = createComponentReferenceEntity(ap.getResource());
        final AccessPolicySummaryDTO apSummary = dtoFactory.createAccessPolicySummaryDto(ap, componentReference);
        final PermissionsDTO apPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(ap.getIdentifier()));
        final RevisionDTO apRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(ap.getIdentifier()));
        return entityFactory.createAccessPolicySummaryEntity(apSummary, apRevision, apPermissions);
    }

    @Override
    public UserGroupEntity createUserGroup(final Revision revision, final UserGroupDTO userGroupDTO) {
        final String creator = NiFiUserUtils.getNiFiUserIdentity();
        final Group newUserGroup = userGroupDAO.createUserGroup(userGroupDTO);
        final Set<TenantEntity> tenantEntities = newUserGroup.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet());
        final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(newUserGroup.getIdentifier()).stream()
                .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
        final UserGroupDTO newUserGroupDto = dtoFactory.createUserGroupDto(newUserGroup, tenantEntities, policyEntities);

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
        return entityFactory.createUserGroupEntity(newUserGroupDto, dtoFactory.createRevisionDTO(new FlowModification(revision, creator)), permissions);
    }

    private void validateSnippetContents(final FlowSnippetDTO flow) {
        // validate any processors
        if (flow.getProcessors() != null) {
            for (final ProcessorDTO processorDTO : flow.getProcessors()) {
                final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId());
                processorDTO.setValidationStatus(processorNode.getValidationStatus().name());

                final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors();
                if (validationErrors != null && !validationErrors.isEmpty()) {
                    final List<String> errors = new ArrayList<>();
                    for (final ValidationResult validationResult : validationErrors) {
                        errors.add(validationResult.toString());
                    }
                    processorDTO.setValidationErrors(errors);
                }
            }
        }

        if (flow.getInputPorts() != null) {
            for (final PortDTO portDTO : flow.getInputPorts()) {
                final Port port = inputPortDAO.getPort(portDTO.getId());
                final Collection<ValidationResult> validationErrors = port.getValidationErrors();
                if (validationErrors != null && !validationErrors.isEmpty()) {
                    final List<String> errors = new ArrayList<>();
                    for (final ValidationResult validationResult : validationErrors) {
                        errors.add(validationResult.toString());
                    }
                    portDTO.setValidationErrors(errors);
                }
            }
        }

        if (flow.getOutputPorts() != null) {
            for (final PortDTO portDTO : flow.getOutputPorts()) {
                final Port port = outputPortDAO.getPort(portDTO.getId());
                final Collection<ValidationResult> validationErrors = port.getValidationErrors();
                if (validationErrors != null && !validationErrors.isEmpty()) {
                    final List<String> errors = new ArrayList<>();
                    for (final ValidationResult validationResult : validationErrors) {
                        errors.add(validationResult.toString());
                    }
                    portDTO.setValidationErrors(errors);
                }
            }
        }

        // get any remote process group issues
        if (flow.getRemoteProcessGroups() != null) {
            for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) {
                final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());

                if (remoteProcessGroup.getAuthorizationIssue() != null) {
                    remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue()));
                }
            }
        }
    }

    @Override
    public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) {
        // create the new snippet
        final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);

        // save the flow
        controllerFacade.save();

        // drop the snippet
        snippetDAO.dropSnippet(snippetId);

        // post process new flow snippet
        final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);

        final FlowEntity flowEntity = new FlowEntity();
        flowEntity.setFlow(flowDto);
        return flowEntity;
    }

    @Override
    public SnippetEntity createSnippet(final SnippetDTO snippetDTO) {
        // add the component
        final Snippet snippet = snippetDAO.createSnippet(snippetDTO);

        // save the flow
        controllerFacade.save();

        final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
        final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null);

        return entityFactory.createSnippetEntity(snapshot.getComponent());
    }

    @Override
    public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) {
        final RevisionUpdate<PortDTO> snapshot = createComponent(
                revision,
                inputPortDTO,
                () -> inputPortDAO.createPort(groupId, inputPortDTO),
                port -> dtoFactory.createPortDto(port));

        final Port port = inputPortDAO.getPort(inputPortDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
        final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) {
        final RevisionUpdate<PortDTO> snapshot = createComponent(
                revision,
                outputPortDTO,
                () -> outputPortDAO.createPort(groupId, outputPortDTO),
                port -> dtoFactory.createPortDto(port));

        final Port port = outputPortDAO.getPort(outputPortDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port));
        final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public ProcessGroupEntity createProcessGroup(final Revision revision, final String parentGroupId, final ProcessGroupDTO processGroupDTO) {
        final RevisionUpdate<ProcessGroupDTO> snapshot = createComponent(
                revision,
                processGroupDTO,
                () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO),
                processGroup -> dtoFactory.createProcessGroupDto(processGroup));

        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
        final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, status, bulletinEntities);
    }

    @Override
    public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) {
        final RevisionUpdate<RemoteProcessGroupDTO> snapshot = createComponent(
                revision,
                remoteProcessGroupDTO,
                () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO),
                remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup));

        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(remoteProcessGroup));
        final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(remoteProcessGroup.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(remoteProcessGroup.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()),
                permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public boolean isRemoteGroupPortConnected(final String remoteProcessGroupId, final String remotePortId) {
        final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
        RemoteGroupPort port = rpg.getInputPort(remotePortId);
        if (port != null) {
            return port.hasIncomingConnection();
        }

        port = rpg.getOutputPort(remotePortId);
        if (port != null) {
            return !port.getConnections().isEmpty();
        }

        throw new ResourceNotFoundException("Could not find Port with ID " + remotePortId + " as a child of RemoteProcessGroup with ID " + remoteProcessGroupId);
    }

    @Override
    public void verifyCanAddTemplate(String groupId, String name) {
        templateDAO.verifyCanAddTemplate(name, groupId);
    }

    @Override
    public void verifyCanInstantiate(final String groupId, final FlowSnippetDTO snippetDTO) {
        templateDAO.verifyCanInstantiate(groupId, snippetDTO);
    }

    @Override
    public void verifyComponentTypes(final VersionedProcessGroup versionedGroup) {
        controllerFacade.verifyComponentTypes(versionedGroup);
    }

    @Override
    public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final VersionedProcessGroup contents, final String groupId) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        verifyImportProcessGroup(versionControlInfo, contents, group);
    }

    private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final VersionedProcessGroup contents, final ProcessGroup group) {
        if (group == null) {
            return;
        }

        final VersionControlInformation vci = group.getVersionControlInformation();
        if (vci != null) {
            // Note that we do not compare the Registry ID here because there could be two registry clients
            // that point to the same server (one could point to localhost while another points to 127.0.0.1, for instance)..
            if (Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier())
                && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) {

                throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. "
                    + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A.");
            }
        }

        final Set<VersionedProcessGroup> childGroups = contents.getProcessGroups();
        if (childGroups != null) {
            for (final VersionedProcessGroup childGroup : childGroups) {
                final VersionedFlowCoordinates childCoordinates = childGroup.getVersionedFlowCoordinates();
                if (childCoordinates != null) {
                    final VersionControlInformationDTO childVci = new VersionControlInformationDTO();
                    childVci.setBucketId(childCoordinates.getBucketId());
                    childVci.setFlowId(childCoordinates.getFlowId());
                    verifyImportProcessGroup(childVci, childGroup, group);
                }
            }
        }

        verifyImportProcessGroup(vciDto, contents, group.getParent());
    }

    @Override
    public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional<String> idGenerationSeed) {
        // get the specified snippet
        final Snippet snippet = snippetDAO.getSnippet(snippetId);

        // create the template
        final TemplateDTO templateDTO = new TemplateDTO();
        templateDTO.setName(name);
        templateDTO.setDescription(description);
        templateDTO.setTimestamp(new Date());
        templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true));
        templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION);

        // set the id based on the specified seed
        final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
        templateDTO.setId(uuid);

        // create the template
        final Template template = templateDAO.createTemplate(templateDTO, groupId);

        // drop the snippet
        snippetDAO.dropSnippet(snippetId);

        // save the flow
        controllerFacade.save();

        return dtoFactory.createTemplateDTO(template);
    }

    /**
     * Ensures default values are populated for all components in this snippet. This is necessary to handle old templates without default values
     * and when existing properties have default values introduced.
     *
     * @param snippet snippet
     */
    private void ensureDefaultPropertyValuesArePopulated(final FlowSnippetDTO snippet) {
        if (snippet != null) {
            if (snippet.getControllerServices() != null) {
                snippet.getControllerServices().forEach(dto -> {
                    if (dto.getProperties() == null) {
                        dto.setProperties(new LinkedHashMap<>());
                    }

                    try {
                        final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
                        configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
                            if (dto.getProperties().get(descriptor.getName()) == null) {
                                dto.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
                            }
                        });
                    } catch (final Exception e) {
                        logger.warn(String.format("Unable to create ControllerService of type %s to populate default values.", dto.getType()));
                    }
                });
            }

            if (snippet.getProcessors() != null) {
                snippet.getProcessors().forEach(dto -> {
                    if (dto.getConfig() == null) {
                        dto.setConfig(new ProcessorConfigDTO());
                    }

                    final ProcessorConfigDTO config = dto.getConfig();
                    if (config.getProperties() == null) {
                        config.setProperties(new LinkedHashMap<>());
                    }

                    try {
                        final ConfigurableComponent configurableComponent = controllerFacade.getTemporaryComponent(dto.getType(), dto.getBundle());
                        configurableComponent.getPropertyDescriptors().forEach(descriptor -> {
                            if (config.getProperties().get(descriptor.getName()) == null) {
                                config.getProperties().put(descriptor.getName(), descriptor.getDefaultValue());
                            }
                        });
                    } catch (final Exception e) {
                        logger.warn(String.format("Unable to create Processor of type %s to populate default values.", dto.getType()));
                    }
                });
            }

            if (snippet.getProcessGroups() != null) {
                snippet.getProcessGroups().forEach(processGroup -> {
                    ensureDefaultPropertyValuesArePopulated(processGroup.getContents());
                });
            }
        }
    }

    @Override
    public TemplateDTO importTemplate(final TemplateDTO templateDTO, final String groupId, final Optional<String> idGenerationSeed) {
        // ensure id is set
        final String uuid = idGenerationSeed.isPresent() ? (UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString() : UUID.randomUUID().toString();
        templateDTO.setId(uuid);

        // mark the timestamp
        templateDTO.setTimestamp(new Date());

        // ensure default values are populated
        ensureDefaultPropertyValuesArePopulated(templateDTO.getSnippet());

        // import the template
        final Template template = templateDAO.importTemplate(templateDTO, groupId);


        controllerFacade.save();

        // return the template dto
        return dtoFactory.createTemplateDTO(template);
    }

    /**
     * Post processes a new flow snippet including validation, removing the snippet, and DTO conversion.
     *
     * @param groupId group id
     * @param snippet snippet
     * @return flow dto
     */
    private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) {
        // validate the new snippet
        validateSnippetContents(snippet);

        // identify all components added
        final Set<String> identifiers = new HashSet<>();
        snippet.getProcessors().stream()
                .map(proc -> proc.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getConnections().stream()
                .map(conn -> conn.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getInputPorts().stream()
                .map(port -> port.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getOutputPorts().stream()
                .map(port -> port.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getProcessGroups().stream()
                .map(group -> group.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getRemoteProcessGroups().stream()
                .map(remoteGroup -> remoteGroup.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getRemoteProcessGroups().stream()
                .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
                .flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
                .map(remoteInputPort -> remoteInputPort.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getRemoteProcessGroups().stream()
                .filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
                .flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
                .map(remoteOutputPort -> remoteOutputPort.getId())
                .forEach(id -> identifiers.add(id));
        snippet.getLabels().stream()
                .map(label -> label.getId())
                .forEach(id -> identifiers.add(id));

        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
        return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager, this::getProcessGroupBulletins);
    }

    @Override
    public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateEncodingVersion,
                                             final FlowSnippetDTO requestSnippet, final String idGenerationSeed) {

        // instantiate the template - there is no need to make another copy of the flow snippet since the actual template
        // was copied and this dto is only used to instantiate it's components (which as already completed)
        final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateEncodingVersion, requestSnippet, idGenerationSeed);

        // save the flow
        controllerFacade.save();

        // post process the new flow snippet
        final FlowDTO flowDto = postProcessNewFlowSnippet(groupId, snippet);

        final FlowEntity flowEntity = new FlowEntity();
        flowEntity.setFlow(flowDto);
        return flowEntity;
    }

    @Override
    public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
        controllerServiceDTO.setParentGroupId(groupId);

        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        // request claim for component to be created... revision already verified (version == 0)
        final RevisionClaim claim = new StandardRevisionClaim(revision);

        final RevisionUpdate<ControllerServiceDTO> snapshot;
        if (groupId == null) {
            // update revision through revision manager
            snapshot = revisionManager.updateRevision(claim, user, () -> {
                // Unfortunately, we can not use the createComponent() method here because createComponent() wants to obtain the read lock
                // on the group. The Controller Service may or may not have a Process Group (it won't if it's controller-scoped).
                final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
                controllerFacade.save();

                awaitValidationCompletion(controllerService);
                final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);

                final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
                return new StandardRevisionUpdate<>(dto, lastMod);
            });
        } else {
            snapshot = revisionManager.updateRevision(claim, user, () -> {
                final ControllerServiceNode controllerService = controllerServiceDAO.createControllerService(controllerServiceDTO);
                controllerFacade.save();

                awaitValidationCompletion(controllerService);
                final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerService);

                final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
                return new StandardRevisionUpdate<>(dto, lastMod);
            });
        }

        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
    }

    @Override
    public ControllerServiceEntity updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) {
        // get the component, ensure we have access to it, and perform the update request
        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceDTO.getId());
        final RevisionUpdate<ControllerServiceDTO> snapshot = updateComponent(revision,
                controllerService,
                () -> controllerServiceDAO.updateControllerService(controllerServiceDTO),
                cs -> {
                    awaitValidationCompletion(cs);
                    final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs);
                    final ControllerServiceReference ref = controllerService.getReferences();
                    final ControllerServiceReferencingComponentsEntity referencingComponentsEntity =
                            createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerService.getIdentifier()));
                    dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
                    return dto;
                });

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(controllerServiceDTO.getId()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createControllerServiceEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
    }

    @Override
    public List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(final String controllerServiceId, final ControllerServiceDTO controllerService, final Map<String, String> variables) {
        return controllerServiceDAO.verifyConfiguration(controllerServiceId, controllerService, variables);
    }

    @Override
    public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents(
            final Map<String, Revision> referenceRevisions, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {

        final RevisionClaim claim = new StandardRevisionClaim(referenceRevisions.values());

        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, user,
                new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() {
                    @Override
                    public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
                        final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
                        final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();

                        // get the revisions of the updated components
                        final Map<String, Revision> updatedRevisions = new HashMap<>();
                        for (final ComponentNode component : updated) {
                            final Revision currentRevision = revisionManager.getRevision(component.getIdentifier());
                            final Revision requestRevision = referenceRevisions.get(component.getIdentifier());
                            updatedRevisions.put(component.getIdentifier(), currentRevision.incrementRevision(requestRevision.getClientId()));
                        }

                        // ensure the revision for all referencing components is included regardless of whether they were updated in this request
                        for (final ComponentNode component : updatedReference.findRecursiveReferences(ComponentNode.class)) {
                            updatedRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
                        }

                        final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(updatedReference, updatedRevisions);
                        return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
                    }
                });

        return update.getComponent();
    }

    /**
     * Finds the identifiers for all components referencing a ControllerService.
     *
     * @param reference      ControllerServiceReference
     * @param visited        ControllerServices we've already visited
     */
    private void findControllerServiceReferencingComponentIdentifiers(final ControllerServiceReference reference, final Set<ControllerServiceNode> visited) {
        for (final ComponentNode component : reference.getReferencingComponents()) {

            // if this is a ControllerService consider it's referencing components
            if (component instanceof ControllerServiceNode) {
                final ControllerServiceNode node = (ControllerServiceNode) component;
                if (!visited.contains(node)) {
                    visited.add(node);
                    findControllerServiceReferencingComponentIdentifiers(node.getReferences(), visited);
                }
            }
        }
    }

    /**
     * Creates entities for components referencing a ControllerService using their current revision.
     *
     * @param reference ControllerServiceReference
     * @return The entity
     */
    private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set<String> lockedIds) {
        final Set<ControllerServiceNode> visited = new HashSet<>();
        visited.add(reference.getReferencedComponent());
        findControllerServiceReferencingComponentIdentifiers(reference, visited);

        final Map<String, Revision> referencingRevisions = new HashMap<>();
        for (final ComponentNode component : reference.getReferencingComponents()) {
            referencingRevisions.put(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
        }

        return createControllerServiceReferencingComponentsEntity(reference, referencingRevisions);
    }

    /**
     * Creates entities for components referencing a ControllerService using the specified revisions.
     *
     * @param reference ControllerServiceReference
     * @param revisions The revisions
     * @return The entity
     */
    private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
            final ControllerServiceReference reference, final Map<String, Revision> revisions) {
        final Set<ControllerServiceNode> visited = new HashSet<>();
        visited.add(reference.getReferencedComponent());
        return createControllerServiceReferencingComponentsEntity(reference, revisions, visited);
    }

    /**
     * Creates entities for components referencing a ControllerServcie using the specified revisions.
     *
     * @param reference ControllerServiceReference
     * @param revisions The revisions
     * @param visited   Which services we've already considered (in case of cycle)
     * @return The entity
     */
    private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(
            final ControllerServiceReference reference, final Map<String, Revision> revisions, final Set<ControllerServiceNode> visited) {

        final String modifier = NiFiUserUtils.getNiFiUserIdentity();
        final Set<ComponentNode> referencingComponents = reference.getReferencingComponents();

        final Set<ControllerServiceReferencingComponentEntity> componentEntities = new HashSet<>();
        for (final ComponentNode refComponent : referencingComponents) {
            final PermissionsDTO permissions = dtoFactory.createPermissionsDto(refComponent);
            final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(refComponent));

            final Revision revision = revisions.get(refComponent.getIdentifier());
            final FlowModification flowMod = new FlowModification(revision, modifier);
            final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod);
            final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent);

            if (refComponent instanceof ControllerServiceNode) {
                final ControllerServiceNode node = (ControllerServiceNode) refComponent;

                // indicate if we've hit a cycle
                dto.setReferenceCycle(visited.contains(node));

                // mark node as visited before building the reference cycle
                visited.add(node);

                // if we haven't encountered this service before include it's referencing components
                if (!dto.getReferenceCycle()) {
                    final ControllerServiceReference refReferences = node.getReferences();
                    final Map<String, Revision> referencingRevisions = new HashMap<>(revisions);
                    for (final ComponentNode component : refReferences.getReferencingComponents()) {
                        referencingRevisions.putIfAbsent(component.getIdentifier(), revisionManager.getRevision(component.getIdentifier()));
                    }
                    final ControllerServiceReferencingComponentsEntity references = createControllerServiceReferencingComponentsEntity(refReferences, referencingRevisions, visited);
                    dto.setReferencingComponents(references.getControllerServiceReferencingComponents());
                }
            }

            final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(refComponent.getIdentifier()));
            componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(refComponent.getIdentifier(), dto, revisionDto, permissions, operatePermissions, bulletins));
        }

        final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity();
        entity.setControllerServiceReferencingComponents(componentEntities);
        return entity;
    }

    @Override
    public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) {
        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerService);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(controllerService));
        final ControllerServiceDTO snapshot = deleteComponent(
                revision,
                controllerService.getResource(),
                () -> controllerServiceDAO.deleteControllerService(controllerServiceId),
                true,
                dtoFactory.createControllerServiceDto(controllerService));

        return entityFactory.createControllerServiceEntity(snapshot, null, permissions, operatePermissions, null);
    }


    @Override
    public RegistryClientEntity createRegistryClient(Revision revision, RegistryDTO registryDTO) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        // request claim for component to be created... revision already verified (version == 0)
        final RevisionClaim claim = new StandardRevisionClaim(revision);

        // update revision through revision manager
        final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(claim, user, () -> {
            // add the component
            final FlowRegistry registry = registryDAO.createFlowRegistry(registryDTO);

            // save the flow
            controllerFacade.save();

            final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
            return new StandardRevisionUpdate<>(registry, lastMod);
        });

        final FlowRegistry registry = revisionUpdate.getComponent();
        return createRegistryClientEntity(registry);
    }

    @Override
    public RegistryClientEntity getRegistryClient(final String registryId) {
        final FlowRegistry registry = registryDAO.getFlowRegistry(registryId);
        return createRegistryClientEntity(registry);
    }

    private RegistryClientEntity createRegistryClientEntity(final FlowRegistry flowRegistry) {
        if (flowRegistry == null) {
            return null;
        }

        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(flowRegistry.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getController());
        final RegistryDTO dto = dtoFactory.createRegistryDto(flowRegistry);

        return entityFactory.createRegistryClientEntity(dto, revision, permissions);
    }

    private VersionedFlowEntity createVersionedFlowEntity(final String registryId, final VersionedFlow versionedFlow) {
        if (versionedFlow == null) {
            return null;
        }

        final VersionedFlowDTO dto = new VersionedFlowDTO();
        dto.setRegistryId(registryId);
        dto.setBucketId(versionedFlow.getBucketIdentifier());
        dto.setFlowId(versionedFlow.getIdentifier());
        dto.setFlowName(versionedFlow.getName());
        dto.setDescription(versionedFlow.getDescription());

        final VersionedFlowEntity entity = new VersionedFlowEntity();
        entity.setVersionedFlow(dto);

        return entity;
    }

    private VersionedFlowSnapshotMetadataEntity createVersionedFlowSnapshotMetadataEntity(final String registryId, final VersionedFlowSnapshotMetadata metadata) {
        if (metadata == null) {
            return null;
        }

        final VersionedFlowSnapshotMetadataEntity entity = new VersionedFlowSnapshotMetadataEntity();
        entity.setRegistryId(registryId);
        entity.setVersionedFlowMetadata(metadata);

        return entity;
    }

    @Override
    public Set<RegistryClientEntity> getRegistryClients() {
        return registryDAO.getFlowRegistries().stream()
            .map(this::createRegistryClientEntity)
            .collect(Collectors.toSet());
    }

    @Override
    public Set<RegistryEntity> getRegistriesForUser(final NiFiUser user) {
        return registryDAO.getFlowRegistriesForUser(user).stream()
                .map(flowRegistry -> entityFactory.createRegistryEntity(dtoFactory.createRegistryDto(flowRegistry)))
                .collect(Collectors.toSet());
    }

    @Override
    public Set<BucketEntity> getBucketsForUser(final String registryId, final NiFiUser user) {
        return registryDAO.getBucketsForUser(registryId, user).stream()
                .map(bucket -> {
                    if (bucket == null) {
                        return null;
                    }

                    final BucketDTO dto = new BucketDTO();
                    dto.setId(bucket.getIdentifier());
                    dto.setName(bucket.getName());
                    dto.setDescription(bucket.getDescription());
                    dto.setCreated(bucket.getCreatedTimestamp());

                    final Permissions regPermissions = bucket.getPermissions();
                    final PermissionsDTO permissions = new PermissionsDTO();
                    permissions.setCanRead(regPermissions.getCanRead());
                    permissions.setCanWrite(regPermissions.getCanWrite());

                    return entityFactory.createBucketEntity(dto, permissions);
                })
                .collect(Collectors.toSet());
    }

    @Override
    public Set<VersionedFlowEntity> getFlowsForUser(String registryId, String bucketId, NiFiUser user) {
        return registryDAO.getFlowsForUser(registryId, bucketId, user).stream()
                .map(vf -> createVersionedFlowEntity(registryId, vf))
                .collect(Collectors.toSet());
    }

    @Override
    public Set<VersionedFlowSnapshotMetadataEntity> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) {
        return registryDAO.getFlowVersionsForUser(registryId, bucketId, flowId, user).stream()
                .map(md -> createVersionedFlowSnapshotMetadataEntity(registryId, md))
                .collect(Collectors.toSet());
    }

    @Override
    public RegistryClientEntity updateRegistryClient(Revision revision, RegistryDTO registryDTO) {
        final RevisionClaim revisionClaim = new StandardRevisionClaim(revision);
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        final FlowRegistry registry = registryDAO.getFlowRegistry(registryDTO.getId());
        final RevisionUpdate<FlowRegistry> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, () -> {
            final boolean duplicateName = registryDAO.getFlowRegistries().stream()
                    .anyMatch(reg -> reg.getName().equals(registryDTO.getName()) && !reg.getIdentifier().equals(registryDTO.getId()));

            if (duplicateName) {
                throw new IllegalStateException("Cannot update Flow Registry because a Flow Registry already exists with the name " + registryDTO.getName());
            }

            registry.setDescription(registryDTO.getDescription());
            registry.setName(registryDTO.getName());
            registry.setURL(registryDTO.getUri());

            controllerFacade.save();

            final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
            final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());

            return new StandardRevisionUpdate<>(registry, lastModification);
        });

        final FlowRegistry updatedReg = revisionUpdate.getComponent();
        return createRegistryClientEntity(updatedReg);
    }

    @Override
    public void verifyDeleteRegistry(String registryId) {
        processGroupDAO.verifyDeleteFlowRegistry(registryId);
    }

    @Override
    public RegistryClientEntity deleteRegistryClient(final Revision revision, final String registryId) {
        final RevisionClaim claim = new StandardRevisionClaim(revision);
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        final FlowRegistry registry = revisionManager.deleteRevision(claim, user, () -> {
            final FlowRegistry reg = registryDAO.removeFlowRegistry(registryId);
            controllerFacade.save();
            return reg;
        });

        return createRegistryClientEntity(registry);
    }

    @Override
    public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        // request claim for component to be created... revision already verified (version == 0)
        final RevisionClaim claim = new StandardRevisionClaim(revision);

        // update revision through revision manager
        final RevisionUpdate<ReportingTaskDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
            // create the reporting task
            final ReportingTaskNode reportingTask = reportingTaskDAO.createReportingTask(reportingTaskDTO);

            // save the update
            controllerFacade.save();
            awaitValidationCompletion(reportingTask);

            final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask);
            final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
            return new StandardRevisionUpdate<>(dto, lastMod);
        });

        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
    }

    @Override
    public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
        // get the component, ensure we have access to it, and perform the update request
        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
        final RevisionUpdate<ReportingTaskDTO> snapshot = updateComponent(revision,
                reportingTask,
                () -> reportingTaskDAO.updateReportingTask(reportingTaskDTO),
                rt -> {
                    awaitValidationCompletion(rt);
                    return dtoFactory.createReportingTaskDto(rt);
                });

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createReportingTaskEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), permissions, operatePermissions, bulletinEntities);
    }

    @Override
    public List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(final String reportingTaskId, final ReportingTaskDTO reportingTask) {
        return reportingTaskDAO.verifyConfiguration(reportingTaskId, reportingTask);
    }

    @Override
    public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) {
        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
        final ReportingTaskDTO snapshot = deleteComponent(
                revision,
                reportingTask.getResource(),
                () -> reportingTaskDAO.deleteReportingTask(reportingTaskId),
                true,
                dtoFactory.createReportingTaskDto(reportingTask));

        return entityFactory.createReportingTaskEntity(snapshot, null, permissions, operatePermissions, null);
    }

    @Override
    public void deleteActions(final Date endDate) {
        // get the user from the request
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        if (user == null) {
            throw new WebApplicationException(new Throwable("Unable to access details for current user."));
        }

        // create the purge details
        final FlowChangePurgeDetails details = new FlowChangePurgeDetails();
        details.setEndDate(endDate);

        // create a purge action to record that records are being removed
        final FlowChangeAction purgeAction = new FlowChangeAction();
        purgeAction.setUserIdentity(user.getIdentity());
        purgeAction.setOperation(Operation.Purge);
        purgeAction.setTimestamp(new Date());
        purgeAction.setSourceId("Flow Controller");
        purgeAction.setSourceName("History");
        purgeAction.setSourceType(Component.Controller);
        purgeAction.setActionDetails(details);

        // purge corresponding actions
        auditService.purgeActions(endDate, purgeAction);
    }

    @Override
    public ProvenanceDTO submitProvenance(final ProvenanceDTO query) {
        return controllerFacade.submitProvenance(query);
    }

    @Override
    public void deleteProvenance(final String queryId) {
        controllerFacade.deleteProvenanceQuery(queryId);
    }

    @Override
    public LineageDTO submitLineage(final LineageDTO lineage) {
        return controllerFacade.submitLineage(lineage);
    }

    @Override
    public void deleteLineage(final String lineageId) {
        controllerFacade.deleteLineage(lineageId);
    }

    @Override
    public ProvenanceEventDTO submitReplay(final Long eventId) {
        return controllerFacade.submitReplay(eventId);
    }

    // -----------------------------------------
    // Read Operations
    // -----------------------------------------

    @Override
    public SearchResultsDTO searchController(final String query, final String activeGroupId) {
        return controllerFacade.search(query, activeGroupId);
    }

    @Override
    public DownloadableContent getContent(final String connectionId, final String flowFileUuid, final String uri) {
        return connectionDAO.getContent(connectionId, flowFileUuid, uri);
    }

    @Override
    public DownloadableContent getContent(final Long eventId, final String uri, final ContentDirection contentDirection) {
        return controllerFacade.getContent(eventId, uri, contentDirection);
    }

    @Override
    public ProvenanceDTO getProvenance(final String queryId, final Boolean summarize, final Boolean incrementalResults) {
        return controllerFacade.getProvenanceQuery(queryId, summarize, incrementalResults);
    }

    @Override
    public LineageDTO getLineage(final String lineageId) {
        return controllerFacade.getLineage(lineageId);
    }

    @Override
    public ProvenanceOptionsDTO getProvenanceSearchOptions() {
        return controllerFacade.getProvenanceSearchOptions();
    }

    @Override
    public ProvenanceEventDTO getProvenanceEvent(final Long id) {
        return controllerFacade.getProvenanceEvent(id);
    }

    @Override
    public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
        final ProcessGroupStatusDTO dto = dtoFactory.createProcessGroupStatusDto(processGroup, controllerFacade.getProcessGroupStatus(groupId));

        // prune the response as necessary
        if (!recursive) {
            pruneChildGroups(dto.getAggregateSnapshot());
            if (dto.getNodeSnapshots() != null) {
                for (final NodeProcessGroupStatusSnapshotDTO nodeSnapshot : dto.getNodeSnapshots()) {
                    pruneChildGroups(nodeSnapshot.getStatusSnapshot());
                }
            }
        }

        return entityFactory.createProcessGroupStatusEntity(dto, permissions);
    }

    private void pruneChildGroups(final ProcessGroupStatusSnapshotDTO snapshot) {
        for (final ProcessGroupStatusSnapshotEntity childProcessGroupStatusEntity : snapshot.getProcessGroupStatusSnapshots()) {
            final ProcessGroupStatusSnapshotDTO childProcessGroupStatus = childProcessGroupStatusEntity.getProcessGroupStatusSnapshot();
            childProcessGroupStatus.setConnectionStatusSnapshots(null);
            childProcessGroupStatus.setProcessGroupStatusSnapshots(null);
            childProcessGroupStatus.setInputPortStatusSnapshots(null);
            childProcessGroupStatus.setOutputPortStatusSnapshots(null);
            childProcessGroupStatus.setProcessorStatusSnapshots(null);
            childProcessGroupStatus.setRemoteProcessGroupStatusSnapshots(null);
        }
    }

    @Override
    public ControllerStatusDTO getControllerStatus() {
        return controllerFacade.getControllerStatus();
    }

    @Override
    public ComponentStateDTO getProcessorState(final String processorId) {
        final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null;
        final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL);

        // processor will be non null as it was already found when getting the state
        final ProcessorNode processor = processorDAO.getProcessor(processorId);
        return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState);
    }

    @Override
    public ComponentStateDTO getControllerServiceState(final String controllerServiceId) {
        final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null;
        final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL);

        // controller service will be non null as it was already found when getting the state
        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
        return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState);
    }

    @Override
    public ComponentStateDTO getReportingTaskState(final String reportingTaskId) {
        final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null;
        final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL);

        // reporting task will be non null as it was already found when getting the state
        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
        return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState);
    }

    @Override
    public CountersDTO getCounters() {
        final List<Counter> counters = controllerFacade.getCounters();
        final Set<CounterDTO> counterDTOs = new LinkedHashSet<>(counters.size());
        for (final Counter counter : counters) {
            counterDTOs.add(dtoFactory.createCounterDto(counter));
        }

        final CountersSnapshotDTO snapshotDto = dtoFactory.createCountersDto(counterDTOs);
        final CountersDTO countersDto = new CountersDTO();
        countersDto.setAggregateSnapshot(snapshotDto);

        return countersDto;
    }

    private ConnectionEntity createConnectionEntity(final Connection connection) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(connection.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
        final ConnectionStatusDTO status = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connection.getIdentifier()));
        return entityFactory.createConnectionEntity(dtoFactory.createConnectionDto(connection), revision, permissions, status);
    }

    @Override
    public Set<ConnectionEntity> getConnections(final String groupId) {
        final Set<Connection> connections = connectionDAO.getConnections(groupId);
        return connections.stream()
            .map(connection -> createConnectionEntity(connection))
            .collect(Collectors.toSet());
    }

    @Override
    public ConnectionEntity getConnection(final String connectionId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        return createConnectionEntity(connection);
    }

    @Override
    public DropRequestDTO getFlowFileDropRequest(final String connectionId, final String dropRequestId) {
        return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(connectionId, dropRequestId));
    }

    @Override
    public ListingRequestDTO getFlowFileListingRequest(final String connectionId, final String listingRequestId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(connectionId, listingRequestId));

        // include whether the source and destination are running
        if (connection.getSource() != null) {
            listRequest.setSourceRunning(connection.getSource().isRunning());
        }
        if (connection.getDestination() != null) {
            listRequest.setDestinationRunning(connection.getDestination().isRunning());
        }

        return listRequest;
    }

    @Override
    public FlowFileDTO getFlowFile(final String connectionId, final String flowFileUuid) {
        return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid));
    }

    @Override
    public ConnectionStatusEntity getConnectionStatus(final String connectionId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
        final ConnectionStatusDTO dto = dtoFactory.createConnectionStatusDto(controllerFacade.getConnectionStatus(connectionId));
        return entityFactory.createConnectionStatusEntity(dto, permissions);
    }

    @Override
    public StatusHistoryEntity getConnectionStatusHistory(final String connectionId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
        final StatusHistoryDTO dto = controllerFacade.getConnectionStatusHistory(connectionId);
        return entityFactory.createStatusHistoryEntity(dto, permissions);
    }

    @Override
    public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) {
        final Connection connection = connectionDAO.getConnection(connectionId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
        final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(connection, controllerFacade.getConnectionStatusAnalytics(connectionId));
        return entityFactory.createConnectionStatisticsEntity(dto, permissions);
    }

    private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(processor));
        final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        final ProcessorDTO processorDTO = dtoFactory.createProcessorDto(processor);
        return entityFactory.createProcessorEntity(processorDTO, revision, permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) {
        final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants);
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        return processors.stream()
            .map(processor -> createProcessorEntity(processor, user))
            .collect(Collectors.toSet());
    }

    @Override
    public ProcessorsRunStatusDetailsEntity getProcessorsRunStatusDetails(final Set<String> processorIds, final NiFiUser user) {
        final List<ProcessorRunStatusDetailsEntity> runStatusDetails = processorIds.stream()
            .map(processorDAO::getProcessor)
            .map(processor -> createRunStatusDetailsEntity(processor, user))
            .collect(Collectors.toList());

        final ProcessorsRunStatusDetailsEntity entity = new ProcessorsRunStatusDetailsEntity();
        entity.setRunStatusDetails(runStatusDetails);
        return entity;
    }

    private ProcessorRunStatusDetailsEntity createRunStatusDetailsEntity(final ProcessorNode processor, final NiFiUser user) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
        final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(processor.getIdentifier());
        final ProcessorRunStatusDetailsDTO runStatusDetailsDto = dtoFactory.createProcessorRunStatusDetailsDto(processor, processorStatus);

        if (!permissions.getCanRead()) {
            runStatusDetailsDto.setName(null);
            runStatusDetailsDto.setValidationErrors(null);
        }

        final ProcessorRunStatusDetailsEntity entity = new ProcessorRunStatusDetailsEntity();
        entity.setPermissions(permissions);
        entity.setRevision(revision);
        entity.setRunStatusDetails(runStatusDetailsDto);
        return entity;
    }



    @Override
    public TemplateDTO exportTemplate(final String id) {
        final Template template = templateDAO.getTemplate(id);
        final TemplateDTO templateDetails = template.getDetails();

        final TemplateDTO templateDTO = dtoFactory.createTemplateDTO(template);
        templateDTO.setSnippet(dtoFactory.copySnippetContents(templateDetails.getSnippet()));
        return templateDTO;
    }

    @Override
    public TemplateDTO getTemplate(final String id) {
        return dtoFactory.createTemplateDTO(templateDAO.getTemplate(id));
    }

    @Override
    public Set<TemplateEntity> getTemplates() {
        return templateDAO.getTemplates().stream()
                .map(template -> {
                    final TemplateDTO dto = dtoFactory.createTemplateDTO(template);
                    final PermissionsDTO permissions = dtoFactory.createPermissionsDto(template);

                    final TemplateEntity entity = new TemplateEntity();
                    entity.setId(dto.getId());
                    entity.setPermissions(permissions);
                    entity.setTemplate(dto);
                    return entity;
                }).collect(Collectors.toSet());
    }

    @Override
    public Set<DocumentedTypeDTO> getWorkQueuePrioritizerTypes() {
        return controllerFacade.getFlowFileComparatorTypes();
    }

    @Override
    public Set<DocumentedTypeDTO> getProcessorTypes(final String bundleGroup, final String bundleArtifact, final String type) {
        return controllerFacade.getFlowFileProcessorTypes(bundleGroup, bundleArtifact, type);
    }

    @Override
    public Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final String serviceBundleGroup, final String serviceBundleArtifact, final String serviceBundleVersion,
                                                            final String bundleGroup, final String bundleArtifact, final String type) {
        return controllerFacade.getControllerServiceTypes(serviceType, serviceBundleGroup, serviceBundleArtifact, serviceBundleVersion, bundleGroup, bundleArtifact, type);
    }

    @Override
    public Set<DocumentedTypeDTO> getReportingTaskTypes(final String bundleGroup, final String bundleArtifact, final String type) {
        return controllerFacade.getReportingTaskTypes(bundleGroup, bundleArtifact, type);
    }

    @Override
    public ProcessorEntity getProcessor(final String id) {
        final ProcessorNode processor = processorDAO.getProcessor(id);
        return createProcessorEntity(processor, NiFiUserUtils.getNiFiUser());
    }

    @Override
    public PropertyDescriptorDTO getProcessorPropertyDescriptor(final String id, final String property) {
        final ProcessorNode processor = processorDAO.getProcessor(id);
        PropertyDescriptor descriptor = processor.getPropertyDescriptor(property);

        // return an invalid descriptor if the processor doesn't support this property
        if (descriptor == null) {
            descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
        }

        return dtoFactory.createPropertyDescriptorDto(descriptor, processor.getProcessGroup().getIdentifier());
    }

    @Override
    public ProcessorStatusEntity getProcessorStatus(final String id) {
        final ProcessorNode processor = processorDAO.getProcessor(id);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
        final ProcessorStatusDTO dto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(id));
        return entityFactory.createProcessorStatusEntity(dto, permissions);
    }

    @Override
    public StatusHistoryEntity getProcessorStatusHistory(final String id) {
        final ProcessorNode processor = processorDAO.getProcessor(id);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
        final StatusHistoryDTO dto = controllerFacade.getProcessorStatusHistory(id);
        return entityFactory.createStatusHistoryEntity(dto, permissions);
    }

    @Override
    public StatusHistoryEntity getNodeStatusHistory() {
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade, NiFiUserUtils.getNiFiUser());
        final StatusHistoryDTO dto = controllerFacade.getNodeStatusHistory();
        return entityFactory.createStatusHistoryEntity(dto, permissions);
    }

    private boolean authorizeBulletin(final Bulletin bulletin) {
        final String sourceId = bulletin.getSourceId();
        final ComponentType type = bulletin.getSourceType();

        final Authorizable authorizable;
        try {
            switch (type) {
                case PROCESSOR:
                    authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
                    break;
                case REPORTING_TASK:
                    authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
                    break;
                case CONTROLLER_SERVICE:
                    authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
                    break;
                case FLOW_CONTROLLER:
                    authorizable = controllerFacade;
                    break;
                case INPUT_PORT:
                    authorizable = authorizableLookup.getInputPort(sourceId);
                    break;
                case OUTPUT_PORT:
                    authorizable = authorizableLookup.getOutputPort(sourceId);
                    break;
                case REMOTE_PROCESS_GROUP:
                    authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
                    break;
                default:
                    throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this bulletin.").build());
            }
        } catch (final ResourceNotFoundException e) {
            // if the underlying component is gone, disallow
            return false;
        }

        // perform the authorization
        final AuthorizationResult result = authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
        return Result.Approved.equals(result.getResult());
    }

    @Override
    public BulletinBoardDTO getBulletinBoard(final BulletinQueryDTO query) {
        // build the query
        final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder()
                .groupIdMatches(query.getGroupId())
                .sourceIdMatches(query.getSourceId())
                .nameMatches(query.getName())
                .messageMatches(query.getMessage())
                .after(query.getAfter())
                .limit(query.getLimit());

        // perform the query
        final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());

        // perform the query and generate the results - iterating in reverse order since we are
        // getting the most recent results by ordering by timestamp desc above. this gets the
        // exact results we want but in reverse order
        final List<BulletinEntity> bulletinEntities = new ArrayList<>();
        for (final ListIterator<Bulletin> bulletinIter = results.listIterator(results.size()); bulletinIter.hasPrevious(); ) {
            final Bulletin bulletin = bulletinIter.previous();
            bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
        }

        // create the bulletin board
        final BulletinBoardDTO bulletinBoard = new BulletinBoardDTO();
        bulletinBoard.setBulletins(bulletinEntities);
        bulletinBoard.setGenerated(new Date());
        return bulletinBoard;
    }

    @Override
    public SystemDiagnosticsDTO getSystemDiagnostics() {
        final SystemDiagnostics sysDiagnostics = controllerFacade.getSystemDiagnostics();
        return dtoFactory.createSystemDiagnosticsDto(sysDiagnostics);
    }

    @Override
    public List<ResourceDTO> getResources() {
        final List<Resource> resources = controllerFacade.getResources();
        final List<ResourceDTO> resourceDtos = new ArrayList<>(resources.size());
        for (final Resource resource : resources) {
            resourceDtos.add(dtoFactory.createResourceDto(resource));
        }
        return resourceDtos;
    }

    @Override
    public void discoverCompatibleBundles(VersionedProcessGroup versionedGroup) {
        BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup);
    }

    @Override
    public void resolveInheritedControllerServices(final VersionedFlowSnapshot versionedFlowSnapshot, final String processGroupId, final NiFiUser user) {
        final VersionedProcessGroup versionedGroup = versionedFlowSnapshot.getFlowContents();
        resolveInheritedControllerServices(versionedGroup, processGroupId, versionedFlowSnapshot.getExternalControllerServices(), user);
    }

    private void resolveInheritedControllerServices(final VersionedProcessGroup versionedGroup, final String processGroupId,
                                                    final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
                                                    final NiFiUser user) {
        final Set<String> availableControllerServiceIds = findAllControllerServiceIds(versionedGroup);
        final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(processGroupId);
        final Set<ControllerServiceNode> serviceNodes = parentGroup.getControllerServices(true).stream()
            .filter(service -> service.isAuthorized(authorizer, RequestAction.READ, user))
            .collect(Collectors.toSet());

        final ExtensionManager extensionManager = controllerFacade.getExtensionManager();
        for (final VersionedProcessor processor : versionedGroup.getProcessors()) {
            final BundleCoordinate compatibleBundle = BundleUtils.discoverCompatibleBundle(extensionManager, processor.getType(), processor.getBundle());
            final ConfigurableComponent tempComponent = extensionManager.getTempComponent(processor.getType(), compatibleBundle);

            resolveInheritedControllerServices(processor, availableControllerServiceIds, serviceNodes, externalControllerServiceReferences, tempComponent::getPropertyDescriptor);
        }

        for (final VersionedControllerService service : versionedGroup.getControllerServices()) {
            final BundleCoordinate compatibleBundle = BundleUtils.discoverCompatibleBundle(extensionManager, service.getType(), service.getBundle());
            final ConfigurableComponent tempComponent = extensionManager.getTempComponent(service.getType(), compatibleBundle);

            resolveInheritedControllerServices(service, availableControllerServiceIds, serviceNodes, externalControllerServiceReferences, tempComponent::getPropertyDescriptor);
        }

        for (final VersionedProcessGroup child : versionedGroup.getProcessGroups()) {
            resolveInheritedControllerServices(child, processGroupId, externalControllerServiceReferences, user);
        }
    }


    private void resolveInheritedControllerServices(final VersionedConfigurableComponent component, final Set<String> availableControllerServiceIds,
                                                    final Set<ControllerServiceNode> availableControllerServices,
                                                    final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
                                                    final Function<String, PropertyDescriptor> descriptorLookup) {
        final Map<String, VersionedPropertyDescriptor> descriptors = component.getPropertyDescriptors();
        final Map<String, String> properties = component.getProperties();

        resolveInheritedControllerServices(descriptors, properties, availableControllerServiceIds, availableControllerServices, externalControllerServiceReferences, descriptorLookup);
    }


    private void resolveInheritedControllerServices(final Map<String, VersionedPropertyDescriptor> propertyDescriptors, final Map<String, String> componentProperties,
                                                    final Set<String> availableControllerServiceIds, final Set<ControllerServiceNode> availableControllerServices,
                                                    final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
                                                    final Function<String, PropertyDescriptor> descriptorLookup) {

        for (final Map.Entry<String, String> entry : new HashMap<>(componentProperties).entrySet()) {
            final String propertyName = entry.getKey();
            final String propertyValue = entry.getValue();

            final VersionedPropertyDescriptor propertyDescriptor = propertyDescriptors.get(propertyName);
            if (propertyDescriptor == null) {
                continue;
            }

            if (!propertyDescriptor.getIdentifiesControllerService()) {
                continue;
            }

            // If the referenced Controller Service is available in this flow, there is nothing to resolve.
            if (availableControllerServiceIds.contains(propertyValue)) {
                continue;
            }

            final ExternalControllerServiceReference externalServiceReference = externalControllerServiceReferences == null ? null : externalControllerServiceReferences.get(propertyValue);
            if (externalServiceReference == null) {
                continue;
            }

            final PropertyDescriptor descriptor = descriptorLookup.apply(propertyName);
            if (descriptor == null) {
                continue;
            }

            final Class<? extends ControllerService> referencedServiceClass = descriptor.getControllerServiceDefinition();
            if (referencedServiceClass == null) {
                continue;
            }

            final String externalControllerServiceName = externalServiceReference.getName();
            final List<ControllerServiceNode> matchingControllerServices = availableControllerServices.stream()
                .filter(service -> service.getName().equals(externalControllerServiceName))
                .filter(service -> referencedServiceClass.isAssignableFrom(service.getProxiedControllerService().getClass()))
                .collect(Collectors.toList());

            if (matchingControllerServices.size() != 1) {
                continue;
            }

            final ControllerServiceNode matchingServiceNode = matchingControllerServices.get(0);
            final Optional<String> versionedComponentId = matchingServiceNode.getVersionedComponentId();
            final String resolvedId = versionedComponentId.orElseGet(matchingServiceNode::getIdentifier);

            componentProperties.put(propertyName, resolvedId);
        }
    }

    private Set<String> findAllControllerServiceIds(final VersionedProcessGroup group) {
        final Set<String> ids = new HashSet<>();
        findAllControllerServiceIds(group, ids);
        return ids;
    }

    private void findAllControllerServiceIds(final VersionedProcessGroup group, final Set<String> ids) {
        for (final VersionedControllerService service : group.getControllerServices()) {
            ids.add(service.getIdentifier());
        }

        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
            findAllControllerServiceIds(childGroup, ids);
        }
    }

    @Override
    public BundleCoordinate getCompatibleBundle(String type, BundleDTO bundleDTO) {
        return BundleUtils.getCompatibleBundle(controllerFacade.getExtensionManager(), type, bundleDTO);
    }

    @Override
    public ConfigurableComponent getTempComponent(String classType, BundleCoordinate bundleCoordinate) {
        return controllerFacade.getExtensionManager().getTempComponent(classType, bundleCoordinate);
    }

    /**
     * Ensures the specified user has permission to access the specified port. This method does
     * not utilize the DataTransferAuthorizable as that will enforce the entire chain is
     * authorized for the transfer. This method is only invoked when obtaining the site to site
     * details so the entire chain isn't necessary.
     */
    private boolean isUserAuthorized(final NiFiUser user, final Port port) {
        final boolean isSiteToSiteSecure = Boolean.TRUE.equals(properties.isSiteToSiteSecure());

        // if site to site is not secure, allow all users
        if (!isSiteToSiteSecure) {
            return true;
        }

        final Map<String, String> userContext;
        if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
            userContext = new HashMap<>();
            userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
        } else {
            userContext = null;
        }

        final AuthorizationRequest request = new AuthorizationRequest.Builder()
                .resource(ResourceFactory.getDataTransferResource(port.getResource()))
                .identity(user.getIdentity())
                .groups(user.getAllGroups())
                .anonymous(user.isAnonymous())
                .accessAttempt(false)
                .action(RequestAction.WRITE)
                .userContext(userContext)
                .explanationSupplier(() -> "Unable to retrieve port details.")
                .build();

        final AuthorizationResult result = authorizer.authorize(request);
        return Result.Approved.equals(result.getResult());
    }

    @Override
    public ControllerDTO getSiteToSiteDetails() {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        if (user == null) {
            throw new WebApplicationException(new Throwable("Unable to access details for current user."));
        }

        // serialize the input ports this NiFi has access to
        final Set<PortDTO> inputPortDtos = new LinkedHashSet<>();
        for (final Port inputPort : controllerFacade.getPublicInputPorts()) {
            if (isUserAuthorized(user, inputPort)) {
                final PortDTO dto = new PortDTO();
                dto.setId(inputPort.getIdentifier());
                dto.setName(inputPort.getName());
                dto.setComments(inputPort.getComments());
                dto.setState(inputPort.getScheduledState().toString());
                inputPortDtos.add(dto);
            }
        }

        // serialize the output ports this NiFi has access to
        final Set<PortDTO> outputPortDtos = new LinkedHashSet<>();
        for (final Port outputPort : controllerFacade.getPublicOutputPorts()) {
            if (isUserAuthorized(user, outputPort)) {
                final PortDTO dto = new PortDTO();
                dto.setId(outputPort.getIdentifier());
                dto.setName(outputPort.getName());
                dto.setComments(outputPort.getComments());
                dto.setState(outputPort.getScheduledState().toString());
                outputPortDtos.add(dto);
            }
        }

        // get the root group
        final ProcessGroup rootGroup = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId());
        final ProcessGroupCounts counts = rootGroup.getCounts();

        // create the controller dto
        final ControllerDTO controllerDTO = new ControllerDTO();
        controllerDTO.setId(controllerFacade.getRootGroupId());
        controllerDTO.setInstanceId(controllerFacade.getInstanceId());
        controllerDTO.setName(controllerFacade.getName());
        controllerDTO.setComments(controllerFacade.getComments());
        controllerDTO.setInputPorts(inputPortDtos);
        controllerDTO.setOutputPorts(outputPortDtos);
        controllerDTO.setInputPortCount(inputPortDtos.size());
        controllerDTO.setOutputPortCount(outputPortDtos.size());
        controllerDTO.setRunningCount(counts.getRunningCount());
        controllerDTO.setStoppedCount(counts.getStoppedCount());
        controllerDTO.setInvalidCount(counts.getInvalidCount());
        controllerDTO.setDisabledCount(counts.getDisabledCount());

        // determine the site to site configuration
        controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
        controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
        controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());

        return controllerDTO;
    }

    @Override
    public ControllerConfigurationEntity getControllerConfiguration() {
        final Revision rev = revisionManager.getRevision(FlowController.class.getSimpleName());
        final ControllerConfigurationDTO dto = dtoFactory.createControllerConfigurationDto(controllerFacade);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade);
        final RevisionDTO revision = dtoFactory.createRevisionDTO(rev);
        return entityFactory.createControllerConfigurationEntity(dto, revision, permissions);
    }

    @Override
    public ControllerBulletinsEntity getControllerBulletins() {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final ControllerBulletinsEntity controllerBulletinsEntity = new ControllerBulletinsEntity();

        final List<BulletinEntity> controllerBulletinEntities = new ArrayList<>();

        final Authorizable controllerAuthorizable = authorizableLookup.getController();
        final boolean authorized = controllerAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController());
        controllerBulletinEntities.addAll(bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, authorized)).collect(Collectors.toList()));

        // get the controller service bulletins
        final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
        final List<Bulletin> allControllerServiceBulletins = bulletinRepository.findBulletins(controllerServiceQuery);
        final List<BulletinEntity> controllerServiceBulletinEntities = new ArrayList<>();
        for (final Bulletin bulletin : allControllerServiceBulletins) {
            try {
                final Authorizable controllerServiceAuthorizable = authorizableLookup.getControllerService(bulletin.getSourceId()).getAuthorizable();
                final boolean controllerServiceAuthorized = controllerServiceAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);

                final BulletinEntity controllerServiceBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), controllerServiceAuthorized);
                controllerServiceBulletinEntities.add(controllerServiceBulletin);
                controllerBulletinEntities.add(controllerServiceBulletin);
            } catch (final ResourceNotFoundException e) {
                // controller service missing.. skip
            }
        }
        controllerBulletinsEntity.setControllerServiceBulletins(controllerServiceBulletinEntities);

        // get the reporting task bulletins
        final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
        final List<Bulletin> allReportingTaskBulletins = bulletinRepository.findBulletins(reportingTaskQuery);
        final List<BulletinEntity> reportingTaskBulletinEntities = new ArrayList<>();
        for (final Bulletin bulletin : allReportingTaskBulletins) {
            try {
                final Authorizable reportingTaskAuthorizable = authorizableLookup.getReportingTask(bulletin.getSourceId()).getAuthorizable();
                final boolean reportingTaskAuthorizableAuthorized = reportingTaskAuthorizable.isAuthorized(authorizer, RequestAction.READ, user);

                final BulletinEntity reportingTaskBulletin = entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), reportingTaskAuthorizableAuthorized);
                reportingTaskBulletinEntities.add(reportingTaskBulletin);
                controllerBulletinEntities.add(reportingTaskBulletin);
            } catch (final ResourceNotFoundException e) {
                // reporting task missing.. skip
            }
        }
        controllerBulletinsEntity.setReportingTaskBulletins(reportingTaskBulletinEntities);

        controllerBulletinsEntity.setBulletins(pruneAndSortBulletins(controllerBulletinEntities, BulletinRepository.MAX_BULLETINS_FOR_CONTROLLER));
        return controllerBulletinsEntity;
    }

    @Override
    public FlowConfigurationEntity getFlowConfiguration() {
        final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(),
                properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold());
        final FlowConfigurationEntity entity = new FlowConfigurationEntity();
        entity.setFlowConfiguration(dto);
        return entity;
    }

    @Override
    public AccessPolicyEntity getAccessPolicy(final String accessPolicyId) {
        final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(accessPolicyId);
        return createAccessPolicyEntity(accessPolicy);
    }

    @Override
    public AccessPolicyEntity getAccessPolicy(final RequestAction requestAction, final String resource) {
        Authorizable authorizable;
        try {
            authorizable = authorizableLookup.getAuthorizableFromResource(resource);
        } catch (final ResourceNotFoundException e) {
            // unable to find the underlying authorizable... user authorized based on top level /policies... create
            // an anonymous authorizable to attempt to locate an existing policy for this resource
            authorizable = new Authorizable() {
                @Override
                public Authorizable getParentAuthorizable() {
                    return null;
                }

                @Override
                public Resource getResource() {
                    return new Resource() {
                        @Override
                        public String getIdentifier() {
                            return resource;
                        }

                        @Override
                        public String getName() {
                            return resource;
                        }

                        @Override
                        public String getSafeDescription() {
                            return "Policy " + resource;
                        }
                    };
                }
            };
        }

        final AccessPolicy accessPolicy = accessPolicyDAO.getAccessPolicy(requestAction, authorizable);
        return createAccessPolicyEntity(accessPolicy);
    }

    private AccessPolicyEntity createAccessPolicyEntity(final AccessPolicy accessPolicy) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(accessPolicy.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getAccessPolicyById(accessPolicy.getIdentifier()));
        final ComponentReferenceEntity componentReference = createComponentReferenceEntity(accessPolicy.getResource());
        return entityFactory.createAccessPolicyEntity(
                dtoFactory.createAccessPolicyDto(accessPolicy,
                        accessPolicy.getGroups().stream().map(mapUserGroupIdToTenantEntity(false)).collect(Collectors.toSet()),
                        accessPolicy.getUsers().stream().map(mapUserIdToTenantEntity(false)).collect(Collectors.toSet()), componentReference),
                revision, permissions);
    }

    @Override
    public UserEntity getUser(final String userId) {
        final User user = userDAO.getUser(userId);
        return createUserEntity(user, true);
    }

    @Override
    public Set<UserEntity> getUsers() {
        final Set<User> users = userDAO.getUsers();
        return users.stream()
            .map(user -> createUserEntity(user, false))
            .collect(Collectors.toSet());
    }

    private UserEntity createUserEntity(final User user, final boolean enforceUserExistence) {
        final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(user.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
        final Set<TenantEntity> userGroups = userGroupDAO.getUserGroupsForUser(user.getIdentifier()).stream()
                .map(g -> g.getIdentifier()).map(mapUserGroupIdToTenantEntity(enforceUserExistence)).collect(Collectors.toSet());
        final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUser(user.getIdentifier()).stream()
                .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
        return entityFactory.createUserEntity(dtoFactory.createUserDto(user, userGroups, policyEntities), userRevision, permissions);
    }

    private UserGroupEntity createUserGroupEntity(final Group userGroup, final boolean enforceGroupExistence) {
        final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroup.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(authorizableLookup.getTenant());
        final Set<TenantEntity> users = userGroup.getUsers().stream().map(mapUserIdToTenantEntity(enforceGroupExistence)).collect(Collectors.toSet());
        final Set<AccessPolicySummaryEntity> policyEntities = userGroupDAO.getAccessPoliciesForUserGroup(userGroup.getIdentifier()).stream()
                .map(ap -> createAccessPolicySummaryEntity(ap)).collect(Collectors.toSet());
        return entityFactory.createUserGroupEntity(dtoFactory.createUserGroupDto(userGroup, users, policyEntities), userGroupRevision, permissions);
    }

    @Override
    public UserGroupEntity getUserGroup(final String userGroupId) {
        final Group userGroup = userGroupDAO.getUserGroup(userGroupId);
        return createUserGroupEntity(userGroup, true);
    }

    @Override
    public Set<UserGroupEntity> getUserGroups() {
        final Set<Group> userGroups = userGroupDAO.getUserGroups();
        return userGroups.stream()
            .map(userGroup -> createUserGroupEntity(userGroup, false))
            .collect(Collectors.toSet());
    }

    private LabelEntity createLabelEntity(final Label label) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(label.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(label);
        return entityFactory.createLabelEntity(dtoFactory.createLabelDto(label), revision, permissions);
    }

    @Override
    public Set<LabelEntity> getLabels(final String groupId) {
        final Set<Label> labels = labelDAO.getLabels(groupId);
        return labels.stream()
            .map(label -> createLabelEntity(label))
            .collect(Collectors.toSet());
    }

    @Override
    public LabelEntity getLabel(final String labelId) {
        final Label label = labelDAO.getLabel(labelId);
        return createLabelEntity(label);
    }

    private FunnelEntity createFunnelEntity(final Funnel funnel) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(funnel.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(funnel);
        return entityFactory.createFunnelEntity(dtoFactory.createFunnelDto(funnel), revision, permissions);
    }

    @Override
    public Set<FunnelEntity> getFunnels(final String groupId) {
        final Set<Funnel> funnels = funnelDAO.getFunnels(groupId);
        return funnels.stream()
            .map(funnel -> createFunnelEntity(funnel))
            .collect(Collectors.toSet());
    }

    @Override
    public FunnelEntity getFunnel(final String funnelId) {
        final Funnel funnel = funnelDAO.getFunnel(funnelId);
        return createFunnelEntity(funnel);
    }

    private PortEntity createInputPortEntity(final Port port) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
        final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
    }

    private PortEntity createOutputPortEntity(final Port port) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, NiFiUserUtils.getNiFiUser());
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(port), NiFiUserUtils.getNiFiUser());
        final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createPortEntity(dtoFactory.createPortDto(port), revision, permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public Set<PortEntity> getInputPorts(final String groupId) {
        final Set<Port> inputPorts = inputPortDAO.getPorts(groupId);
        return inputPorts.stream()
            .map(port -> createInputPortEntity(port))
            .collect(Collectors.toSet());
    }

    @Override
    public Set<PortEntity> getOutputPorts(final String groupId) {
        final Set<Port> ports = outputPortDAO.getPorts(groupId);
        return ports.stream()
            .map(port -> createOutputPortEntity(port))
            .collect(Collectors.toSet());
    }

    private ProcessGroupEntity createProcessGroupEntity(final ProcessGroup group) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(group);
        final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(group.getIdentifier()));
        final List<BulletinEntity> bulletins = getProcessGroupBulletins(group);
        return entityFactory.createProcessGroupEntity(dtoFactory.createProcessGroupDto(group), revision, permissions, status, bulletins);
    }

    private List<BulletinEntity> getProcessGroupBulletins(final ProcessGroup group) {
        final List<Bulletin> bulletins = new ArrayList<>(bulletinRepository.findBulletinsForGroupBySource(group.getIdentifier()));

        for (final ProcessGroup descendantGroup : group.findAllProcessGroups()) {
            bulletins.addAll(bulletinRepository.findBulletinsForGroupBySource(descendantGroup.getIdentifier()));
        }

        List<BulletinEntity> bulletinEntities = new ArrayList<>();
        for (final Bulletin bulletin : bulletins) {
            bulletinEntities.add(entityFactory.createBulletinEntity(dtoFactory.createBulletinDto(bulletin), authorizeBulletin(bulletin)));
        }

        return pruneAndSortBulletins(bulletinEntities, BulletinRepository.MAX_BULLETINS_PER_COMPONENT);
    }

    private List<BulletinEntity> pruneAndSortBulletins(final List<BulletinEntity> bulletinEntities, final int maxBulletins) {
        // sort the bulletins
        Collections.sort(bulletinEntities, new Comparator<BulletinEntity>() {
            @Override
            public int compare(BulletinEntity o1, BulletinEntity o2) {
                if (o1 == null && o2 == null) {
                    return 0;
                }
                if (o1 == null) {
                    return 1;
                }
                if (o2 == null) {
                    return -1;
                }

                return -Long.compare(o1.getId(), o2.getId());
            }
        });

        // prune the response to only include the max number of bulletins
        if (bulletinEntities.size() > maxBulletins) {
            return bulletinEntities.subList(0, maxBulletins);
        } else {
            return bulletinEntities;
        }
    }

    @Override
    public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId) {
        final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId);
        return groups.stream()
            .map(group -> createProcessGroupEntity(group))
            .collect(Collectors.toSet());
    }

    private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(rpg), user);
        final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(rpg, controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createRemoteProcessGroupEntity(dtoFactory.createRemoteProcessGroupDto(rpg), revision, permissions, operatePermissions, status, bulletinEntities);
    }

    @Override
    public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId);
        return rpgs.stream()
            .map(rpg -> createRemoteGroupEntity(rpg, user))
            .collect(Collectors.toSet());
    }

    @Override
    public PortEntity getInputPort(final String inputPortId) {
        final Port port = inputPortDAO.getPort(inputPortId);
        return createInputPortEntity(port);
    }

    @Override
    public PortStatusEntity getInputPortStatus(final String inputPortId) {
        final Port inputPort = inputPortDAO.getPort(inputPortId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort);
        final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(inputPortId));
        return entityFactory.createPortStatusEntity(dto, permissions);
    }

    @Override
    public PortEntity getOutputPort(final String outputPortId) {
        final Port port = outputPortDAO.getPort(outputPortId);
        return createOutputPortEntity(port);
    }

    @Override
    public PortStatusEntity getOutputPortStatus(final String outputPortId) {
        final Port outputPort = outputPortDAO.getPort(outputPortId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort);
        final PortStatusDTO dto = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(outputPortId));
        return entityFactory.createPortStatusEntity(dto, permissions);
    }

    @Override
    public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) {
        final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
        return createRemoteGroupEntity(rpg, NiFiUserUtils.getNiFiUser());
    }

    @Override
    public RemoteProcessGroupStatusEntity getRemoteProcessGroupStatus(final String id) {
        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
        final RemoteProcessGroupStatusDTO dto = dtoFactory.createRemoteProcessGroupStatusDto(remoteProcessGroup, controllerFacade.getRemoteProcessGroupStatus(id));
        return entityFactory.createRemoteProcessGroupStatusEntity(dto, permissions);
    }

    @Override
    public StatusHistoryEntity getRemoteProcessGroupStatusHistory(final String id) {
        final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(id);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(remoteProcessGroup);
        final StatusHistoryDTO dto = controllerFacade.getRemoteProcessGroupStatusHistory(id);
        return entityFactory.createStatusHistoryEntity(dto, permissions);
    }

    @Override
    public CurrentUserEntity getCurrentUser() {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final CurrentUserEntity entity = new CurrentUserEntity();
        entity.setIdentity(user.getIdentity());
        entity.setAnonymous(user.isAnonymous());
        entity.setProvenancePermissions(dtoFactory.createPermissionsDto(authorizableLookup.getProvenance()));
        entity.setCountersPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getCounters()));
        entity.setTenantsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
        entity.setControllerPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getController()));
        entity.setPoliciesPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getPolicies()));
        entity.setSystemPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getSystem()));
        entity.setParameterContextPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getParameterContexts()));
        entity.setCanVersionFlows(CollectionUtils.isNotEmpty(flowRegistryClient.getRegistryIdentifiers()));

        entity.setRestrictedComponentsPermissions(dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents()));

        final Set<ComponentRestrictionPermissionDTO> componentRestrictionPermissions = new HashSet<>();
        Arrays.stream(RequiredPermission.values()).forEach(requiredPermission -> {
            final PermissionsDTO restrictionPermissions = dtoFactory.createPermissionsDto(authorizableLookup.getRestrictedComponents(requiredPermission));

            final RequiredPermissionDTO requiredPermissionDto = new RequiredPermissionDTO();
            requiredPermissionDto.setId(requiredPermission.getPermissionIdentifier());
            requiredPermissionDto.setLabel(requiredPermission.getPermissionLabel());

            final ComponentRestrictionPermissionDTO componentRestrictionPermissionDto = new ComponentRestrictionPermissionDTO();
            componentRestrictionPermissionDto.setRequiredPermission(requiredPermissionDto);
            componentRestrictionPermissionDto.setPermissions(restrictionPermissions);

            componentRestrictionPermissions.add(componentRestrictionPermissionDto);
        });
        entity.setComponentRestrictionPermissions(componentRestrictionPermissions);

        return entity;
    }

    @Override
    public ProcessGroupFlowEntity getProcessGroupFlow(final String groupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);

        // Get the Process Group Status but we only need a status depth of one because for any child process group,
        // we ignore the status of each individual components. I.e., if Process Group A has child Group B, and child Group B
        // has a Processor, we don't care about the individual stats of that Processor because the ProcessGroupFlowEntity
        // doesn't include that anyway. So we can avoid including the information in the status that is returned.
        final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId, 1);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
        return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager, this::getProcessGroupBulletins), permissions);
    }

    @Override
    public ProcessGroupEntity getProcessGroup(final String groupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        return createProcessGroupEntity(processGroup);
    }

    private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
        final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);

        final ControllerServiceReference ref = serviceNode.getReferences();
        final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, serviceIds);
        dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());

        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, NiFiUserUtils.getNiFiUser());
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(serviceNode), NiFiUserUtils.getNiFiUser());
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createControllerServiceEntity(dto, revision, permissions, operatePermissions, bulletinEntities);
    }

    @Override
    public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        if (processGroup == null) {
            throw new ResourceNotFoundException("Could not find group with ID " + groupId);
        }

        return createVariableRegistryEntity(processGroup, includeAncestorGroups);
    }

    private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) {
        final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup, revisionManager);
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);

        if (includeAncestorGroups) {
            ProcessGroup parent = processGroup.getParent();
            while (parent != null) {
                final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(parent);
                if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
                    final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent, revisionManager);
                    final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables();
                    registryDto.getVariables().addAll(parentVariables);
                }

                parent = parent.getParent();
            }
        }

        return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
    }

    @Override
    public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) {
        final String groupId = variableRegistryDto.getProcessGroupId();
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        if (processGroup == null) {
            throw new ResourceNotFoundException("Could not find group with ID " + groupId);
        }

        final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup, revisionManager);
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
        return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
    }

    @Override
    public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
        final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
        final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());

        return serviceNodes.stream()
            .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
            .collect(Collectors.toSet());
    }

    @Override
    public ControllerServiceEntity getControllerService(final String controllerServiceId) {
        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
        return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId));
    }

    @Override
    public PropertyDescriptorDTO getControllerServicePropertyDescriptor(final String id, final String property) {
        final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(id);
        PropertyDescriptor descriptor = controllerService.getControllerServiceImplementation().getPropertyDescriptor(property);

        // return an invalid descriptor if the controller service doesn't support this property
        if (descriptor == null) {
            descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
        }

        final String groupId = controllerService.getProcessGroup() == null ? null : controllerService.getProcessGroup().getIdentifier();
        return dtoFactory.createPropertyDescriptorDto(descriptor, groupId);
    }

    @Override
    public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) {
        final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
        final ControllerServiceReference ref = service.getReferences();
        return createControllerServiceReferencingComponentsEntity(ref, Sets.newHashSet(controllerServiceId));
    }

    private ReportingTaskEntity createReportingTaskEntity(final ReportingTaskNode reportingTask) {
        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(reportingTask.getIdentifier()));
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(reportingTask);
        final PermissionsDTO operatePermissions = dtoFactory.createPermissionsDto(new OperationAuthorizable(reportingTask));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(reportingTask.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createReportingTaskEntity(dtoFactory.createReportingTaskDto(reportingTask), revision, permissions, operatePermissions, bulletinEntities);
    }

    @Override
    public Set<ReportingTaskEntity> getReportingTasks() {
        final Set<ReportingTaskNode> reportingTasks = reportingTaskDAO.getReportingTasks();
        return reportingTasks.stream()
            .map(reportingTask -> createReportingTaskEntity(reportingTask))
            .collect(Collectors.toSet());
    }

    @Override
    public ReportingTaskEntity getReportingTask(final String reportingTaskId) {
        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId);
        return createReportingTaskEntity(reportingTask);
    }

    @Override
    public PropertyDescriptorDTO getReportingTaskPropertyDescriptor(final String id, final String property) {
        final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(id);
        PropertyDescriptor descriptor = reportingTask.getReportingTask().getPropertyDescriptor(property);

        // return an invalid descriptor if the reporting task doesn't support this property
        if (descriptor == null) {
            descriptor = new PropertyDescriptor.Builder().name(property).addValidator(Validator.INVALID).dynamic(true).build();
        }

        return dtoFactory.createPropertyDescriptorDto(descriptor, null);
    }

    @Override
    public StatusHistoryEntity getProcessGroupStatusHistory(final String groupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
        final StatusHistoryDTO dto = controllerFacade.getProcessGroupStatusHistory(groupId);
        return entityFactory.createStatusHistoryEntity(dto, permissions);
    }

    @Override
    public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
        final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();

        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);

        int snapshotVersion;
        if (VersionedFlowDTO.FORCE_COMMIT_ACTION.equals(versionedFlowDto.getAction())) {
            snapshotVersion = -1;
        } else {
            final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
            snapshotVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
        }

        // Create a VersionedProcessGroup snapshot of the flow as it is currently.
        final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
        final Map<String, VersionedParameterContext> parameterContexts = createVersionedParameterContexts(processGroup);

        final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();

        final VersionedFlow versionedFlow = new VersionedFlow();
        versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId());
        versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
        versionedFlow.setDescription(versionedFlowDto.getDescription());
        versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp());
        versionedFlow.setName(versionedFlowDto.getFlowName());
        versionedFlow.setIdentifier(flowId);

        // Add the Versioned Flow and first snapshot to the Flow Registry
        final String registryId = requestEntity.getVersionedFlow().getRegistryId();
        final VersionedFlowSnapshot registeredSnapshot;
        final VersionedFlow registeredFlow;
        final boolean registerNewFlow = versionedFlowDto.getFlowId() == null;

        try {
            // first, create the flow in the registry, if necessary
            if (registerNewFlow) {
                registeredFlow = registerVersionedFlow(registryId, versionedFlow);
            } else {
                registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
            }
        } catch (final NiFiRegistryException e) {
            throw new IllegalArgumentException(e.getLocalizedMessage());
        } catch (final IOException ioe) {
            throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to create the flow");
        }

        try {
            // add a snapshot to the flow in the registry
            registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, parameterContexts, versionedProcessGroup.getExternalControllerServiceReferences(),
                versionedFlowDto.getComments(), snapshotVersion);
        } catch (final NiFiCoreException e) {
            // If the flow has been created, but failed to add a snapshot,
            // then we need to capture the created versioned flow information as a partial successful result.
            if (registerNewFlow) {
                logger.error("The flow has been created, but failed to add a snapshot. Returning the created flow information.", e);
                final VersionControlInformationDTO vci = new VersionControlInformationDTO();
                vci.setBucketId(registeredFlow.getBucketIdentifier());
                vci.setBucketName(registeredFlow.getBucketName());
                vci.setFlowId(registeredFlow.getIdentifier());
                vci.setFlowName(registeredFlow.getName());
                vci.setFlowDescription(registeredFlow.getDescription());
                vci.setGroupId(groupId);
                vci.setRegistryId(registryId);
                vci.setRegistryName(getFlowRegistryName(registryId));
                vci.setVersion(0);
                vci.setState(VersionedFlowState.SYNC_FAILURE.name());
                vci.setStateExplanation(e.getLocalizedMessage());

                return createVersionControlComponentMappingEntity(groupId, versionedProcessGroup, vci);
            }

            throw e;
        }

        final Bucket bucket = registeredSnapshot.getBucket();
        final VersionedFlow flow = registeredSnapshot.getFlow();

        // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
        final VersionControlInformationDTO vci = new VersionControlInformationDTO();
        vci.setBucketId(bucket.getIdentifier());
        vci.setBucketName(bucket.getName());
        vci.setFlowId(flow.getIdentifier());
        vci.setFlowName(flow.getName());
        vci.setFlowDescription(flow.getDescription());
        vci.setGroupId(groupId);
        vci.setRegistryId(registryId);
        vci.setRegistryName(getFlowRegistryName(registryId));
        vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
        vci.setState(VersionedFlowState.UP_TO_DATE.name());

        return createVersionControlComponentMappingEntity(groupId, versionedProcessGroup, vci);
    }

    private VersionControlComponentMappingEntity createVersionControlComponentMappingEntity(String groupId, InstantiatedVersionedProcessGroup versionedProcessGroup, VersionControlInformationDTO vci) {
        final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);

        final Revision groupRevision = revisionManager.getRevision(groupId);
        final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision);

        final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity();
        entity.setVersionControlInformation(vci);
        entity.setProcessGroupRevision(groupRevisionDto);
        entity.setVersionControlComponentMapping(mapping);
        return entity;
    }

    @Override
    public VersionedFlowSnapshot getCurrentFlowSnapshotByGroupId(final String processGroupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);

        // Create a complete (include descendant flows) VersionedProcessGroup snapshot of the flow as it is
        // currently without any registry related fields populated, even if the flow is currently versioned.
        final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
        final InstantiatedVersionedProcessGroup nonVersionedProcessGroup =
                mapper.mapNonVersionedProcessGroup(processGroup, controllerFacade.getControllerServiceProvider());

        // Create a complete (include descendant flows) map of parameter contexts
        final Map<String, VersionedParameterContext> parameterContexts =
                mapper.mapParameterContexts(processGroup, true);

        final VersionedFlowSnapshot nonVersionedFlowSnapshot = new VersionedFlowSnapshot();
        nonVersionedFlowSnapshot.setFlowContents(nonVersionedProcessGroup);
        nonVersionedFlowSnapshot.setExternalControllerServices(nonVersionedProcessGroup.getExternalControllerServiceReferences());
        nonVersionedFlowSnapshot.setParameterContexts(parameterContexts);
        nonVersionedFlowSnapshot.setFlowEncodingVersion(RestBasedFlowRegistry.FLOW_ENCODING_VERSION);

        return nonVersionedFlowSnapshot;
    }

    @Override
    public VersionedFlowSnapshot getVersionedFlowSnapshotByGroupId(final String processGroupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
        final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();

        return getVersionedFlowSnapshot(versionControlInfo.getRegistryIdentifier(), versionControlInfo.getBucketIdentifier(),
                versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true);
    }

    @Override
    public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
        return getVersionedFlowSnapshot(versionControlInfo.getRegistryId(), versionControlInfo.getBucketId(), versionControlInfo.getFlowId(),
                versionControlInfo.getVersion(), fetchRemoteFlows);
    }

    /**
     *
     * @param registryId        the id of the registry to retrieve the versioned flow from
     * @param bucketId          the id of the bucket within the registry
     * @param flowId            the id of the flow within the bucket/registry
     * @param flowVersion       the version of the flow to retrieve
     * @param fetchRemoteFlows  indicator to include remote flows when retrieving the flow
     * @return a VersionedFlowSnapshot from a registry with the given version
     */
    private VersionedFlowSnapshot getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId,
                                                           final Integer flowVersion, final boolean fetchRemoteFlows) {
        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
        if (flowRegistry == null) {
            throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId);
        }

        final VersionedFlowSnapshot snapshot;
        try {
            snapshot = flowRegistry.getFlowContents(bucketId, flowId, flowVersion, fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
        } catch (final NiFiRegistryException e) {
            logger.error(e.getMessage(), e);
            throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
                    + bucketId + ", Flow " + flowId + ", Version " + flowVersion);
        } catch (final IOException ioe) {
            throw new IllegalStateException(
                    "Failed to communicate with Flow Registry when attempting to retrieve a versioned flow");
        }

        return snapshot;
    }

    @Override
    public VersionedFlow deleteVersionedFlow(final String registryId, final String bucketId, final String flowId) {
        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
        if (registry == null) {
            throw new IllegalArgumentException("No Flow Registry exists with ID " + registryId);
        }

        try {
            return registry.deleteVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
        } catch (final IOException | NiFiRegistryException e) {
            throw new NiFiCoreException("Failed to remove flow from Flow Registry due to " + e.getMessage(), e);
        }
    }

    @Override
    public boolean isAnyProcessGroupUnderVersionControl(final String groupId) {
        return isProcessGroupUnderVersionControl(processGroupDAO.getProcessGroup(groupId));
    }

    private boolean isProcessGroupUnderVersionControl(final ProcessGroup processGroup) {
        if (processGroup.getVersionControlInformation() != null) {
            return true;
        }
        final Set<ProcessGroup> childGroups = processGroup.getProcessGroups();
        if (childGroups != null) {
            return childGroups.stream()
                    .anyMatch(childGroup -> isProcessGroupUnderVersionControl(childGroup));
        }
        return false;
    }

    @Override
    public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
        if (versionControlInfo == null) {
            return null;
        }

        final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(processGroup);
        final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId));
        return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision);
    }

    private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
        final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
        final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
        return versionedGroup;
    }

    private Map<String, VersionedParameterContext> createVersionedParameterContexts(final ProcessGroup processGroup) {
        final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
        return mapper.mapParameterContexts(processGroup, false);
    }

    @Override
    public FlowComparisonEntity getLocalModifications(final String processGroupId) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
        final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
        if (versionControlInfo == null) {
            throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control");
        }

        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryIdentifier());
        if (flowRegistry == null) {
            throw new IllegalStateException("Process Group with ID " + processGroupId + " is tracking to a flow in Flow Registry with ID " + versionControlInfo.getRegistryIdentifier()
                + " but cannot find a Flow Registry with that identifier");
        }

        final VersionedFlowSnapshot versionedFlowSnapshot;
        try {
            versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
                versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true, NiFiUserUtils.getNiFiUser());
        } catch (final IOException | NiFiRegistryException e) {
            throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);
        }

        final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
        final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
        final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();

        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
        final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);

        final Set<String> ancestorServiceIds = processGroup.getAncestorServiceIds();
        final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
        final FlowComparison flowComparison = flowComparator.compare();

        final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison, controllerFacade.getFlowManager());

        final FlowComparisonEntity entity = new FlowComparisonEntity();
        entity.setComponentDifferences(differenceDtos);
        return entity;
    }

    @Override
    public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
        if (registry == null) {
            throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
        }

        try {
            return registry.registerVersionedFlow(flow, NiFiUserUtils.getNiFiUser());
        } catch (final IOException | NiFiRegistryException e) {
            throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
        }
    }

    private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, NiFiRegistryException {
        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
        if (registry == null) {
            throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
        }

        return registry.getVersionedFlow(bucketId, flowId, NiFiUserUtils.getNiFiUser());
    }

    @Override
    public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow, final VersionedProcessGroup snapshot,
                                                               final Map<String, VersionedParameterContext> parameterContexts,
                                                               final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences, final String comments,
                                                               final int expectedVersion) {
        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
        if (registry == null) {
            throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
        }

        try {
            return registry.registerVersionedFlowSnapshot(flow, snapshot, externalControllerServiceReferences, parameterContexts, comments, expectedVersion, NiFiUserUtils.getNiFiUser());
        } catch (final IOException | NiFiRegistryException e) {
            throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e);
        }
    }

    @Override
    public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId,
            final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) {

        final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);

        final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
            group,
            () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping),
            processGroup -> dtoFactory.createVersionControlInformationDto(processGroup));

        return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
    }

    @Override
    public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);

        final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
            group,
            () -> processGroupDAO.disconnectVersionControl(processGroupId),
            processGroup -> dtoFactory.createVersionControlInformationDto(group));

        return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
    }

    @Override
    public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
    }

    @Override
    public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId, final String saveAction) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId, saveAction);
    }

    @Override
    public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
        group.verifyCanRevertLocalModifications();

        // verify that the process group can be updated to the given snapshot. We do not verify that connections can
        // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
        // have been stopped.
        group.verifyCanUpdate(versionedFlowSnapshot, false, false);
    }

    @Override
    public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);

        final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
        final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);

        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", localContents);
        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents());

        final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
        final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor());
        final FlowComparison comparison = flowComparator.compare();

        final FlowManager flowManager = controllerFacade.getFlowManager();
        final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
            .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
            .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
            .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
            .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
            .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
            .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, proposedFlow.getContents(), flowManager))
            .filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
            .map(difference -> {
                final VersionedComponent localComponent = difference.getComponentA();

                final String state;
                switch (localComponent.getComponentType()) {
                    case CONTROLLER_SERVICE:
                        final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
                        state = controllerServiceDAO.getControllerService(serviceId).getState().name();
                        break;
                    case PROCESSOR:
                        final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId();
                        state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
                        break;
                    case REMOTE_INPUT_PORT:
                        final InstantiatedVersionedRemoteGroupPort remoteInputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
                        state = remoteProcessGroupDAO.getRemoteProcessGroup(remoteInputPort.getInstanceGroupId()).getInputPort(remoteInputPort.getInstanceId()).getScheduledState().name();
                        break;
                    case REMOTE_OUTPUT_PORT:
                        final InstantiatedVersionedRemoteGroupPort remoteOutputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
                        state = remoteProcessGroupDAO.getRemoteProcessGroup(remoteOutputPort.getInstanceGroupId()).getOutputPort(remoteOutputPort.getInstanceId()).getScheduledState().name();
                        break;
                    case INPUT_PORT:
                        final InstantiatedVersionedPort versionedInputPort = (InstantiatedVersionedPort) localComponent;
                        final Port inputPort = getInputPort(versionedInputPort);
                        state = inputPort == null ? null : inputPort.getScheduledState().name();
                        break;
                    case OUTPUT_PORT:
                        final InstantiatedVersionedPort versionedOutputPort = (InstantiatedVersionedPort) localComponent;
                        final Port outputPort = getOutputPort(versionedOutputPort);
                        state = outputPort == null ? null : outputPort.getScheduledState().name();
                        break;
                    default:
                        state = null;
                        break;
                }

                return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state);
            })
            .collect(Collectors.toCollection(HashSet::new));

        for (final FlowDifference difference : comparison.getDifferences()) {
            // Ignore these as local differences for now because we can't do anything with it
            if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
                continue;
            }

            // Ignore differences for adding remote ports
            if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
                continue;
            }

            // Ignore name changes to public ports
            if (FlowDifferenceFilters.isPublicPortNameChange(difference)) {
                continue;
            }

            if (FlowDifferenceFilters.isIgnorableVersionedFlowCoordinateChange(difference)) {
                continue;
            }

            if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, controllerFacade.getFlowManager())) {
                continue;
            }

            if (FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, updatedSnapshot.getFlowContents(), controllerFacade.getFlowManager())) {
                continue;
            }

            final VersionedComponent localComponent = difference.getComponentA();
            if (localComponent == null) {
                continue;
            }

            // If any Process Group is removed, consider all components below that Process Group as an affected component
            if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.PROCESS_GROUP) {
                final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId();
                final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);

                localGroup.findAllProcessors().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllFunnels().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllInputPorts().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllOutputPorts().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllRemoteProcessGroups().stream()
                    .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllControllerServices().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
            }

            if (localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE) {
                final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
                final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);

                final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
                for (final ControllerServiceNode referencingService : referencingServices) {
                    affectedComponents.add(createAffectedComponentEntity(referencingService));
                }

                final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
                for (final ProcessorNode referencingProcessor : referencingProcessors) {
                    affectedComponents.add(createAffectedComponentEntity(referencingProcessor));
                }
            }
        }

        // Create a map of all connectable components by versioned component ID to the connectable component itself
        final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
        mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
        mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
        mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
        mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);

        final List<RemoteGroupPort> remotePorts = new ArrayList<>();
        for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
            remotePorts.addAll(rpg.getInputPorts());
            remotePorts.addAll(rpg.getOutputPorts());
        }
        mapToConnectableId(remotePorts, connectablesByVersionId);

        // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
        // and the destination (if it exists in the flow currently).
        for (final FlowDifference difference : comparison.getDifferences()) {
            VersionedComponent component = difference.getComponentA();
            if (component == null) {
                component = difference.getComponentB();
            }

            if (component.getComponentType() != org.apache.nifi.flow.ComponentType.CONNECTION) {
                continue;
            }

            final VersionedConnection connection = (VersionedConnection) component;

            final String sourceVersionedId = connection.getSource().getId();
            final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
            if (sources != null) {
                for (final Connectable source : sources) {
                    affectedComponents.add(createAffectedComponentEntity(source));
                }
            }

            final String destinationVersionId = connection.getDestination().getId();
            final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
            if (destinations != null) {
                for (final Connectable destination : destinations) {
                    affectedComponents.add(createAffectedComponentEntity(destination));
                }
            }
        }

        return affectedComponents;
    }

    private Port getInputPort(final InstantiatedVersionedPort port) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(port.getInstanceGroupId());
        if (processGroup == null) {
            return null;
        }

        return processGroup.getInputPort(port.getInstanceId());
    }

    private Port getOutputPort(final InstantiatedVersionedPort port) {
        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(port.getInstanceGroupId());
        if (processGroup == null) {
            return null;
        }

        return processGroup.getOutputPort(port.getInstanceId());
    }

    private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
        for (final Connectable connectable : connectables) {
            final Optional<String> versionedIdOption = connectable.getVersionedComponentId();

            // Determine the Versioned ID by using the ID that is assigned, if one is. Otherwise,
            // we will calculate the Versioned ID. This allows us to map connectables that currently are not under
            // version control. We have to do this so that if we are changing flow versions and have a component that is running and it does not exist
            // in the Versioned Flow, we still need to be able to create an AffectedComponentDTO for it.
            final String versionedId;
            if (versionedIdOption.isPresent()) {
                versionedId = versionedIdOption.get();
            } else {
                versionedId = NiFiRegistryFlowMapper.generateVersionedComponentId(connectable.getIdentifier());
            }

            final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
            byVersionedId.add(connectable);
        }
    }


    private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable) {
        final AffectedComponentEntity entity = new AffectedComponentEntity();
        entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
        entity.setId(connectable.getIdentifier());

        final Authorizable authorizable = getAuthorizable(connectable);
        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
        entity.setPermissions(permissionsDto);

        final AffectedComponentDTO dto = new AffectedComponentDTO();
        dto.setId(connectable.getIdentifier());
        dto.setReferenceType(connectable.getConnectableType().name());
        dto.setState(connectable.getScheduledState().name());

        final String groupId = connectable instanceof RemoteGroupPort ? ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier() : connectable.getProcessGroupIdentifier();
        dto.setProcessGroupId(groupId);

        entity.setComponent(dto);
        return entity;
    }

    private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode) {
        final AffectedComponentEntity entity = new AffectedComponentEntity();
        entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
        entity.setId(serviceNode.getIdentifier());

        final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
        entity.setPermissions(permissionsDto);

        final AffectedComponentDTO dto = new AffectedComponentDTO();
        dto.setId(serviceNode.getIdentifier());
        dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
        dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier());
        dto.setState(serviceNode.getState().name());

        entity.setComponent(dto);
        return entity;
    }

    private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState) {
        final AffectedComponentEntity entity = new AffectedComponentEntity();
        entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
        entity.setId(instance.getInstanceId());

        final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
        entity.setPermissions(permissionsDto);

        final AffectedComponentDTO dto = new AffectedComponentDTO();
        dto.setId(instance.getInstanceId());
        dto.setReferenceType(componentTypeName);
        dto.setProcessGroupId(instance.getInstanceGroupId());
        dto.setState(componentState);

        entity.setComponent(dto);
        return entity;
    }


    private Authorizable getAuthorizable(final Connectable connectable) {
        switch (connectable.getConnectableType()) {
            case REMOTE_INPUT_PORT:
            case REMOTE_OUTPUT_PORT:
                final String rpgId = ((RemoteGroupPort) connectable).getRemoteProcessGroup().getIdentifier();
                return authorizableLookup.getRemoteProcessGroup(rpgId);
            default:
                return authorizableLookup.getLocalConnectable(connectable.getIdentifier());
        }
    }

    private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
        final String componentId = versionedComponent.getInstanceId();

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE.name())) {
            return authorizableLookup.getControllerService(componentId).getAuthorizable();
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.CONNECTION.name())) {
            return authorizableLookup.getConnection(componentId).getAuthorizable();
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.FUNNEL.name())) {
            return authorizableLookup.getFunnel(componentId);
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.INPUT_PORT.name())) {
            return authorizableLookup.getInputPort(componentId);
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.OUTPUT_PORT.name())) {
            return authorizableLookup.getOutputPort(componentId);
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.LABEL.name())) {
            return authorizableLookup.getLabel(componentId);
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.PROCESS_GROUP.name())) {
            return authorizableLookup.getProcessGroup(componentId).getAuthorizable();
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.PROCESSOR.name())) {
            return authorizableLookup.getProcessor(componentId).getAuthorizable();
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.REMOTE_INPUT_PORT.name())) {
            return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) {
            return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
        }

        if (componentTypeName.equals(org.apache.nifi.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
            return authorizableLookup.getRemoteProcessGroup(componentId);
        }

        return null;
    }

    @Override
    public String getFlowRegistryName(final String flowRegistryId) {
        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(flowRegistryId);
        return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
    }

    private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) {
        final List<Revision> revisions = new ArrayList<>();
        if (includeGroupRevision) {
            revisions.add(revisionManager.getRevision(processGroup.getIdentifier()));
        }

        processGroup.findAllConnections().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllControllerServices().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllFunnels().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllInputPorts().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllOutputPorts().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllLabels().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllProcessGroups().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllProcessors().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllRemoteProcessGroups().stream()
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);
        processGroup.findAllRemoteProcessGroups().stream()
            .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
            .map(component -> revisionManager.getRevision(component.getIdentifier()))
            .forEach(revisions::add);

        return revisions;
    }

    @Override
    public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
                                                         final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified,
                                                         final boolean updateSettings, final boolean updateDescendantVersionedFlows) {

        final NiFiUser user = NiFiUserUtils.getNiFiUser();

        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
        final List<Revision> revisions = getComponentRevisions(processGroup, false);
        revisions.add(revision);

        final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);

        final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() {
            @Override
            public RevisionUpdate<ProcessGroupDTO> update() {
                // update the Process Group
                final ProcessGroup updatedProcessGroup = processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings,
                    updateDescendantVersionedFlows);

                // update the revisions
                final Set<Revision> updatedRevisions = revisions.stream()
                    .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId()))
                    .collect(Collectors.toSet());

                // save
                controllerFacade.save();

                // gather details for response
                final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup);

                final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId());
                final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
                return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions);
            }
        });

        final FlowModification lastModification = revisionUpdate.getLastModification();

        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification);
        final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
        return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities);
    }


    private AuthorizationResult authorizeAction(final Action action) {
        final String sourceId = action.getSourceId();
        final Component type = action.getSourceType();

        Authorizable authorizable;
        try {
            switch (type) {
                case Processor:
                    authorizable = authorizableLookup.getProcessor(sourceId).getAuthorizable();
                    break;
                case ReportingTask:
                    authorizable = authorizableLookup.getReportingTask(sourceId).getAuthorizable();
                    break;
                case ControllerService:
                    authorizable = authorizableLookup.getControllerService(sourceId).getAuthorizable();
                    break;
                case Controller:
                    authorizable = controllerFacade;
                    break;
                case InputPort:
                    authorizable = authorizableLookup.getInputPort(sourceId);
                    break;
                case OutputPort:
                    authorizable = authorizableLookup.getOutputPort(sourceId);
                    break;
                case ProcessGroup:
                    authorizable = authorizableLookup.getProcessGroup(sourceId).getAuthorizable();
                    break;
                case RemoteProcessGroup:
                    authorizable = authorizableLookup.getRemoteProcessGroup(sourceId);
                    break;
                case Funnel:
                    authorizable = authorizableLookup.getFunnel(sourceId);
                    break;
                case Connection:
                    authorizable = authorizableLookup.getConnection(sourceId).getAuthorizable();
                    break;
                case ParameterContext:
                    authorizable = authorizableLookup.getParameterContext(sourceId);
                    break;
                case AccessPolicy:
                    authorizable = authorizableLookup.getAccessPolicyById(sourceId);
                    break;
                case User:
                case UserGroup:
                    authorizable = authorizableLookup.getTenant();
                    break;
                default:
                    throw new WebApplicationException(Response.serverError().entity("An unexpected type of component is the source of this action.").build());
            }
        } catch (final ResourceNotFoundException e) {
            // if the underlying component is gone, use the controller to see if permissions should be allowed
            authorizable = controllerFacade;
        }

        // perform the authorization
        return authorizable.checkAuthorization(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
    }

    @Override
    public HistoryDTO getActions(final HistoryQueryDTO historyQueryDto) {
        // extract the query criteria
        final HistoryQuery historyQuery = new HistoryQuery();
        historyQuery.setStartDate(historyQueryDto.getStartDate());
        historyQuery.setEndDate(historyQueryDto.getEndDate());
        historyQuery.setSourceId(historyQueryDto.getSourceId());
        historyQuery.setUserIdentity(historyQueryDto.getUserIdentity());
        historyQuery.setOffset(historyQueryDto.getOffset());
        historyQuery.setCount(historyQueryDto.getCount());
        historyQuery.setSortColumn(historyQueryDto.getSortColumn());
        historyQuery.setSortOrder(historyQueryDto.getSortOrder());

        // perform the query
        final History history = auditService.getActions(historyQuery);

        // only retain authorized actions
        final HistoryDTO historyDto = dtoFactory.createHistoryDto(history);
        if (history.getActions() != null) {
            final List<ActionEntity> actionEntities = new ArrayList<>();
            for (final Action action : history.getActions()) {
                final AuthorizationResult result = authorizeAction(action);
                actionEntities.add(entityFactory.createActionEntity(dtoFactory.createActionDto(action), Result.Approved.equals(result.getResult())));
            }
            historyDto.setActions(actionEntities);
        }

        // create the response
        return historyDto;
    }

    @Override
    public ActionEntity getAction(final Integer actionId) {
        // get the action
        final Action action = auditService.getAction(actionId);

        // ensure the action was found
        if (action == null) {
            throw new ResourceNotFoundException(String.format("Unable to find action with id '%s'.", actionId));
        }

        final AuthorizationResult result = authorizeAction(action);
        final boolean authorized = Result.Approved.equals(result.getResult());
        if (!authorized) {
            throw new AccessDeniedException(result.getExplanation());
        }

        // return the action
        return entityFactory.createActionEntity(dtoFactory.createActionDto(action), authorized);
    }

    @Override
    public ComponentHistoryDTO getComponentHistory(final String componentId) {
        final Map<String, PropertyHistoryDTO> propertyHistoryDtos = new LinkedHashMap<>();
        final Map<String, List<PreviousValue>> propertyHistory = auditService.getPreviousValues(componentId);

        for (final Map.Entry<String, List<PreviousValue>> entry : propertyHistory.entrySet()) {
            final List<PreviousValueDTO> previousValueDtos = new ArrayList<>();

            for (final PreviousValue previousValue : entry.getValue()) {
                final PreviousValueDTO dto = new PreviousValueDTO();
                dto.setPreviousValue(previousValue.getPreviousValue());
                dto.setTimestamp(previousValue.getTimestamp());
                dto.setUserIdentity(previousValue.getUserIdentity());
                previousValueDtos.add(dto);
            }

            if (!previousValueDtos.isEmpty()) {
                final PropertyHistoryDTO propertyHistoryDto = new PropertyHistoryDTO();
                propertyHistoryDto.setPreviousValues(previousValueDtos);
                propertyHistoryDtos.put(entry.getKey(), propertyHistoryDto);
            }
        }

        final ComponentHistoryDTO history = new ComponentHistoryDTO();
        history.setComponentId(componentId);
        history.setPropertyHistory(propertyHistoryDtos);

        return history;
    }

    @Override
    public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
        final ProcessorNode processor = processorDAO.getProcessor(id);
        final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(id);

        // Generate Processor Diagnostics
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> {
            final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
            return createControllerServiceEntity(serviceNode, Collections.emptySet());
        });

        // Filter anything out of diagnostics that the user is not authorized to see.
        final List<JVMDiagnosticsSnapshotDTO> jvmDiagnosticsSnaphots = new ArrayList<>();
        final JVMDiagnosticsDTO jvmDiagnostics = dto.getJvmDiagnostics();
        jvmDiagnosticsSnaphots.add(jvmDiagnostics.getAggregateSnapshot());

        // filter controller-related information
        final boolean canReadController = authorizableLookup.getController().isAuthorized(authorizer, RequestAction.READ, user);
        if (!canReadController) {
            for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
                snapshot.setControllerDiagnostics(null);
            }
        }

        // filter system diagnostics information
        final boolean canReadSystem = authorizableLookup.getSystem().isAuthorized(authorizer, RequestAction.READ, user);
        if (!canReadSystem) {
            for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
                snapshot.setSystemDiagnosticsDto(null);
            }
        }

        final boolean canReadFlow = authorizableLookup.getFlow().isAuthorized(authorizer, RequestAction.READ, user);
        if (!canReadFlow) {
            for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
                snapshot.setFlowDiagnosticsDto(null);
            }
        }

        // filter connections
        final Predicate<ConnectionDiagnosticsDTO> connectionAuthorized = connectionDiagnostics -> {
            final String connectionId = connectionDiagnostics.getConnection().getId();
            return authorizableLookup.getConnection(connectionId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
        };

        // Filter incoming connections by what user is authorized to READ
        final Set<ConnectionDiagnosticsDTO> incoming = dto.getIncomingConnections();
        final Set<ConnectionDiagnosticsDTO> filteredIncoming = incoming.stream()
            .filter(connectionAuthorized)
            .collect(Collectors.toSet());

        dto.setIncomingConnections(filteredIncoming);

        // Filter outgoing connections by what user is authorized to READ
        final Set<ConnectionDiagnosticsDTO> outgoing = dto.getOutgoingConnections();
        final Set<ConnectionDiagnosticsDTO> filteredOutgoing = outgoing.stream()
            .filter(connectionAuthorized)
            .collect(Collectors.toSet());
        dto.setOutgoingConnections(filteredOutgoing);

        // Filter out any controller services that are referenced by the Processor
        final Set<ControllerServiceDiagnosticsDTO> referencedServices = dto.getReferencedControllerServices();
        final Set<ControllerServiceDiagnosticsDTO> filteredReferencedServices = referencedServices.stream()
            .filter(csDiagnostics -> {
                final String csId = csDiagnostics.getControllerService().getId();
                return authorizableLookup.getControllerService(csId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
            })
            .map(csDiagnostics -> {
                // Filter out any referencing components because those are generally not relevant from this context.
                final ControllerServiceDTO serviceDto = csDiagnostics.getControllerService().getComponent();
                if (serviceDto != null) {
                    serviceDto.setReferencingComponents(null);
                }
                return csDiagnostics;
            })
            .collect(Collectors.toSet());
        dto.setReferencedControllerServices(filteredReferencedServices);

        final Revision revision = revisionManager.getRevision(id);
        final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision);
        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(processor);
        final List<BulletinEntity> bulletins = bulletinRepository.findBulletinsForSource(id).stream()
            .map(bulletin -> dtoFactory.createBulletinDto(bulletin))
            .map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissionsDto.getCanRead()))
            .collect(Collectors.toList());

        final ProcessorStatusDTO processorStatusDto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
        return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
    }

    @Override
    public Collection<CollectorRegistry> generateFlowMetrics() {

        final String instanceId = StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : controllerFacade.getInstanceId();
        ProcessGroupStatus rootPGStatus = controllerFacade.getProcessGroupStatus("root");
        PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, rootPGStatus, instanceId, "", "RootProcessGroup",
                PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());

        // Add the total byte counts (read/written) to the NiFi metrics registry
        FlowFileEventRepository flowFileEventRepository = controllerFacade.getFlowFileEventRepository();
        final String rootPGId = StringUtils.isEmpty(rootPGStatus.getId()) ? "" : rootPGStatus.getId();
        final String rootPGName = StringUtils.isEmpty(rootPGStatus.getName()) ? "" : rootPGStatus.getName();
        final FlowFileEvent aggregateEvent = flowFileEventRepository.reportAggregateEvent();
        nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesRead(), "TOTAL_BYTES_READ",
                instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
        nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesWritten(), "TOTAL_BYTES_WRITTEN",
                instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
        nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesSent(), "TOTAL_BYTES_SENT",
                instanceId, "RootProcessGroup", rootPGName, rootPGId, "");
        nifiMetricsRegistry.setDataPoint(aggregateEvent.getBytesReceived(), "TOTAL_BYTES_RECEIVED",
                instanceId, "RootProcessGroup", rootPGName, rootPGId, "");

        PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, JmxJvmMetrics.getInstance(), instanceId);

        // Get Connection Status Analytics (predictions, e.g.)
        Set<Connection> connections = controllerFacade.getFlowManager().findAllConnections();
        for (Connection c : connections) {
            // If a ResourceNotFoundException is thrown, analytics hasn't been enabled
            try {
                PrometheusMetricsUtil.createConnectionStatusAnalyticsMetrics(connectionAnalyticsMetricsRegistry, controllerFacade.getConnectionStatusAnalytics(c.getIdentifier()),
                        instanceId,
                        "Connection",
                        c.getName(),
                        c.getIdentifier(),
                        c.getProcessGroup().getIdentifier(),
                        c.getSource().getName(),
                        c.getSource().getIdentifier(),
                        c.getDestination().getName(),
                        c.getDestination().getIdentifier()
                );
            } catch (ResourceNotFoundException rnfe) {
                break;
            }
        }

        // Create a query to get all bulletins
        final BulletinQueryDTO query = new BulletinQueryDTO();
        BulletinBoardDTO bulletinBoardDTO = getBulletinBoard(query);
        for(BulletinEntity bulletinEntity : bulletinBoardDTO.getBulletins()) {
            BulletinDTO bulletin = bulletinEntity.getBulletin();
            if(bulletin != null) {
                PrometheusMetricsUtil.createBulletinMetrics(bulletinMetricsRegistry, instanceId,
                        "Bulletin",
                        String.valueOf(bulletin.getId()),
                        bulletin.getGroupId() == null ? "" : bulletin.getGroupId(),
                        bulletin.getNodeAddress() == null ? "" : bulletin.getNodeAddress(),
                        bulletin.getCategory(),
                        bulletin.getSourceName(),
                        bulletin.getSourceId(),
                        bulletin.getLevel()
                );
            }
        }
        return ALL_REGISTRIES;
    }

    @Override
    public boolean isClustered() {
        return controllerFacade.isClustered();
    }

    @Override
    public String getNodeId() {
        final NodeIdentifier nodeId = controllerFacade.getNodeId();
        if (nodeId != null) {
            return nodeId.getId();
        } else {
            return null;
        }
    }

    @Override
    public ClusterDTO getCluster() {
        // create cluster summary dto
        final ClusterDTO clusterDto = new ClusterDTO();

        // set current time
        clusterDto.setGenerated(new Date());

        // create node dtos
        final List<NodeDTO> nodeDtos = clusterCoordinator.getNodeIdentifiers().stream()
            .map(nodeId -> getNode(nodeId))
            .collect(Collectors.toList());
        clusterDto.setNodes(nodeDtos);

        return clusterDto;
    }

    @Override
    public NodeDTO getNode(final String nodeId) {
        final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
        return getNode(nodeIdentifier);
    }

    private NodeDTO getNode(final NodeIdentifier nodeId) {
        final NodeConnectionStatus nodeStatus = clusterCoordinator.getConnectionStatus(nodeId);
        final List<NodeEvent> events = clusterCoordinator.getNodeEvents(nodeId);
        final Set<String> roles = getRoles(nodeId);
        final NodeHeartbeat heartbeat = heartbeatMonitor.getLatestHeartbeat(nodeId);
        return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, roles);
    }

    private Set<String> getRoles(final NodeIdentifier nodeId) {
        final Set<String> roles = new HashSet<>();
        final String nodeAddress = nodeId.getSocketAddress() + ":" + nodeId.getSocketPort();

        for (final String roleName : ClusterRoles.getAllRoles()) {
            final String leader = leaderElectionManager.getLeader(roleName);
            if (leader == null) {
                continue;
            }

            if (leader.equals(nodeAddress)) {
                roles.add(roleName);
            }
        }

        return roles;
    }

    @Override
    public void deleteNode(final String nodeId) {
        final NiFiUser user = NiFiUserUtils.getNiFiUser();
        if (user == null) {
            throw new WebApplicationException(new Throwable("Unable to access details for current user."));
        }

        final String userDn = user.getIdentity();
        final NodeIdentifier nodeIdentifier = clusterCoordinator.getNodeIdentifier(nodeId);
        if (nodeIdentifier == null) {
            throw new UnknownNodeException("Cannot remove Node with ID " + nodeId + " because it is not part of the cluster");
        }

        final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
        if (!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && !nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
            throw new IllegalNodeDeletionException("Cannot remove Node with ID " + nodeId +
                    " because it is not disconnected or offloaded, current state = " + nodeConnectionStatus.getState());
        }

        clusterCoordinator.removeNode(nodeIdentifier, userDn);
        heartbeatMonitor.removeHeartbeat(nodeIdentifier);
    }

    /* reusable function declarations for converting ids to tenant entities */
    private Function<String, TenantEntity> mapUserGroupIdToTenantEntity(final boolean enforceGroupExistence) {
        return userGroupId -> {
            final RevisionDTO userGroupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userGroupId));

            final Group group;
            if (enforceGroupExistence || userGroupDAO.hasUserGroup(userGroupId)) {
                group = userGroupDAO.getUserGroup(userGroupId);
            } else {
                group = new Group.Builder().identifier(userGroupId).name("Group ID - " + userGroupId + " (removed externally)").build();
            }

            return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(group), userGroupRevision,
                    dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
        };
    }

    private Function<String, TenantEntity> mapUserIdToTenantEntity(final boolean enforceUserExistence) {
        return userId -> {
            final RevisionDTO userRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(userId));

            final User user;
            if (enforceUserExistence || userDAO.hasUser(userId)) {
                user = userDAO.getUser(userId);
            } else {
                user = new User.Builder().identifier(userId).identity("User ID - " + userId + " (removed externally)").build();
            }

            return entityFactory.createTenantEntity(dtoFactory.createTenantDTO(user), userRevision,
                    dtoFactory.createPermissionsDto(authorizableLookup.getTenant()));
        };
    }

    @Override
    public void verifyPublicInputPortUniqueness(final String portId, final String portName) {
        inputPortDAO.verifyPublicPortUniqueness(portId, portName);
    }

    @Override
    public void verifyPublicOutputPortUniqueness(final String portId, final String portName) {
        outputPortDAO.verifyPublicPortUniqueness(portId, portName);
    }

    /**
     * Create a new flow mapper using a mockable method for testing
     *
     * @param extensionManager  the extension manager to create the flow mapper with
     * @return a new NiFiRegistryFlowMapper instance
     */
    protected NiFiRegistryFlowMapper makeNiFiRegistryFlowMapper(final ExtensionManager extensionManager) {
        return new NiFiRegistryFlowMapper(extensionManager);
    }

    /* setters */
    public void setProperties(final NiFiProperties properties) {
        this.properties = properties;
    }

    public void setControllerFacade(final ControllerFacade controllerFacade) {
        this.controllerFacade = controllerFacade;
    }

    public void setRemoteProcessGroupDAO(final RemoteProcessGroupDAO remoteProcessGroupDAO) {
        this.remoteProcessGroupDAO = remoteProcessGroupDAO;
    }

    public void setLabelDAO(final LabelDAO labelDAO) {
        this.labelDAO = labelDAO;
    }

    public void setFunnelDAO(final FunnelDAO funnelDAO) {
        this.funnelDAO = funnelDAO;
    }

    public void setSnippetDAO(final SnippetDAO snippetDAO) {
        this.snippetDAO = snippetDAO;
    }

    public void setProcessorDAO(final ProcessorDAO processorDAO) {
        this.processorDAO = processorDAO;
    }

    public void setConnectionDAO(final ConnectionDAO connectionDAO) {
        this.connectionDAO = connectionDAO;
    }

    public void setAuditService(final AuditService auditService) {
        this.auditService = auditService;
    }

    public void setRevisionManager(final RevisionManager revisionManager) {
        this.revisionManager = revisionManager;
    }

    public void setDtoFactory(final DtoFactory dtoFactory) {
        this.dtoFactory = dtoFactory;
    }

    public void setEntityFactory(final EntityFactory entityFactory) {
        this.entityFactory = entityFactory;
    }

    public void setInputPortDAO(final PortDAO inputPortDAO) {
        this.inputPortDAO = inputPortDAO;
    }

    public void setOutputPortDAO(final PortDAO outputPortDAO) {
        this.outputPortDAO = outputPortDAO;
    }

    public void setProcessGroupDAO(final ProcessGroupDAO processGroupDAO) {
        this.processGroupDAO = processGroupDAO;
    }

    public void setControllerServiceDAO(final ControllerServiceDAO controllerServiceDAO) {
        this.controllerServiceDAO = controllerServiceDAO;
    }

    public void setReportingTaskDAO(final ReportingTaskDAO reportingTaskDAO) {
        this.reportingTaskDAO = reportingTaskDAO;
    }

    public void setParameterContextDAO(final ParameterContextDAO parameterContextDAO) {
        this.parameterContextDAO = parameterContextDAO;
    }

    public void setTemplateDAO(final TemplateDAO templateDAO) {
        this.templateDAO = templateDAO;
    }

    public void setSnippetUtils(final SnippetUtils snippetUtils) {
        this.snippetUtils = snippetUtils;
    }

    public void setAuthorizableLookup(final AuthorizableLookup authorizableLookup) {
        this.authorizableLookup = authorizableLookup;
    }

    public void setAuthorizer(final Authorizer authorizer) {
        this.authorizer = authorizer;
    }

    public void setUserDAO(final UserDAO userDAO) {
        this.userDAO = userDAO;
    }

    public void setUserGroupDAO(final UserGroupDAO userGroupDAO) {
        this.userGroupDAO = userGroupDAO;
    }

    public void setAccessPolicyDAO(final AccessPolicyDAO accessPolicyDAO) {
        this.accessPolicyDAO = accessPolicyDAO;
    }

    public void setClusterCoordinator(final ClusterCoordinator coordinator) {
        this.clusterCoordinator = coordinator;
    }

    public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) {
        this.heartbeatMonitor = heartbeatMonitor;
    }

    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
        this.bulletinRepository = bulletinRepository;
    }

    public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
        this.leaderElectionManager = leaderElectionManager;
    }

    public void setRegistryDAO(RegistryDAO registryDao) {
        this.registryDAO = registryDao;
    }

    public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
        this.flowRegistryClient = flowRegistryClient;
    }
}
