blob: 783ce49fd69b317875f7b1d9e27ad8b7b8943500 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.util;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.RunStatusDetailsRequestEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ClusterReplicationComponentLifecycle implements ComponentLifecycle {
private static final Logger logger = LoggerFactory.getLogger(ClusterReplicationComponentLifecycle.class);
private ClusterCoordinator clusterCoordinator;
private RequestReplicator requestReplicator;
private NiFiServiceFacade serviceFacade;
private DtoFactory dtoFactory;
@Override
public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> components,
final ScheduledState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
final Set<String> componentIds = components.stream()
.map(ComponentEntity::getId)
.collect(Collectors.toSet());
final Map<String, AffectedComponentEntity> componentMap = components.stream()
.collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity()));
final Map<String, Revision> componentRevisionMap = getRevisions(groupId, componentIds);
final Map<String, RevisionDTO> componentRevisionDtoMap = componentRevisionMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity();
scheduleProcessorsEntity.setComponents(componentRevisionDtoMap);
scheduleProcessorsEntity.setId(groupId);
scheduleProcessorsEntity.setState(desiredState.name());
URI scheduleGroupUri;
try {
scheduleGroupUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(),
exampleUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, exampleUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// If attempting to run the processors, validation must complete first
if (desiredState == ScheduledState.RUNNING) {
try {
waitForProcessorValidation(user, exampleUri, groupId, componentMap, pause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while waiting for processors to complete validation");
}
}
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
try {
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
}
final int scheduleComponentStatus = clusterResponse.getStatus();
if (scheduleComponentStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(clusterResponse, String.class);
throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState + " due to " + explanation);
}
final boolean processorsTransitioned = waitForProcessorStatus(user, exampleUri, groupId, componentMap, desiredState, pause, invalidComponentAction);
if (!processorsTransitioned) {
throw new LifecycleManagementException("Failed while waiting for components to transition to state of " + desiredState);
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while attempting to transition components to state of " + desiredState);
}
final Set<AffectedComponentEntity> updatedEntities = components.stream()
.map(component -> AffectedComponentUtils.updateEntity(component, serviceFacade, dtoFactory))
.collect(Collectors.toSet());
return updatedEntities;
}
private ReplicationTarget getReplicationTarget() {
return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
}
private RequestReplicator getRequestReplicator() {
return requestReplicator;
}
protected NodeIdentifier getClusterCoordinatorNode() {
final NodeIdentifier activeClusterCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
if (activeClusterCoordinator != null) {
return activeClusterCoordinator;
}
throw new NoClusterCoordinatorException();
}
private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) {
final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds);
return processorRevisions.stream().collect(Collectors.toMap(Revision::getComponentId, Function.identity()));
}
private boolean waitForProcessorValidation(final NiFiUser user, final URI originalUri, final String groupId,
final Map<String, AffectedComponentEntity> processors, final Pause pause) throws InterruptedException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/processors/run-status-details/queries", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
final RunStatusDetailsRequestEntity requestEntity = new RunStatusDetailsRequestEntity();
final Set<String> processorIds = processors.values().stream()
.map(AffectedComponentEntity::getId)
.collect(Collectors.toSet());
requestEntity.setProcessorIds(processorIds);
boolean continuePolling = true;
while (continuePolling) {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
}
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity = getResponseEntity(clusterResponse, ProcessorsRunStatusDetailsEntity.class);
if (isProcessorValidationComplete(runStatusDetailsEntity, processors)) {
logger.debug("All {} processors of interest now have been validated: {}", processors.size(), processorIds);
return true;
}
// Not all of the processors are done validating. Pause for a bit and poll again.
continuePolling = pause.pause();
}
return false;
}
private boolean isProcessorValidationComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map<String, AffectedComponentEntity> affectedComponents) {
updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents);
for (final ProcessorRunStatusDetailsEntity statusDetailsEntity : runStatusDetailsEntity.getRunStatusDetails()) {
final ProcessorRunStatusDetailsDTO runStatusDetails = statusDetailsEntity.getRunStatusDetails();
logger.debug("Processor {} now has Run Status of {}", runStatusDetails.getId(), runStatusDetails.getRunStatus());
if (!affectedComponents.containsKey(runStatusDetails.getId())) {
continue;
}
if (ProcessorRunStatusDetailsDTO.VALIDATING.equals(runStatusDetails.getRunStatus())) {
return false;
}
}
return true;
}
/**
* Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
*
* @param user the user making the request
* @param originalUri the original uri
* @param groupId the ID of the Process Group to poll
* @param processors the Processors whose state should be equal to the given desired state
* @param desiredState the desired state for all processors with the ID's given
* @param pause the Pause that can be used to wait between polling
* @param invalidComponentAction indicates how to handle the condition of a Processor being invalid
* @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
*/
private boolean waitForProcessorStatus(final NiFiUser user, final URI originalUri, final String groupId, final Map<String, AffectedComponentEntity> processors,
final ScheduledState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws InterruptedException, LifecycleManagementException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(),
"/nifi-api/processors/run-status-details/queries", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
final Set<String> processorIds = processors.values().stream()
.map(AffectedComponentEntity::getId)
.collect(Collectors.toSet());
final RunStatusDetailsRequestEntity requestEntity = new RunStatusDetailsRequestEntity();
requestEntity.setProcessorIds(processorIds);
boolean continuePolling = true;
while (continuePolling) {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.POST, groupUri, requestEntity, headers).awaitMergedResponse();
}
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity = getResponseEntity(clusterResponse, ProcessorsRunStatusDetailsEntity.class);
if (isProcessorActionComplete(runStatusDetailsEntity, processors, desiredState, invalidComponentAction)) {
logger.debug("All {} processors of interest now have the desired state of {}", processors.size(), desiredState);
return true;
}
// Not all of the processors are in the desired state. Pause for a bit and poll again.
continuePolling = pause.pause();
}
return false;
}
/**
* Extracts the response entity from the specified node response.
*
* @param nodeResponse node response
* @param clazz class
* @param <T> type of class
* @return the response entity
*/
@SuppressWarnings("unchecked")
private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
T entity = (T) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(clazz);
}
return entity;
}
private void updateAffectedProcessors(final Collection<ProcessorRunStatusDetailsEntity> runStatusDetailsEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
// update the affected processors
runStatusDetailsEntities.stream()
.filter(entity -> affectedComponents.containsKey(entity.getRunStatusDetails().getId()))
.forEach(entity -> {
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getRunStatusDetails().getId());
affectedComponentEntity.setRevision(entity.getRevision());
final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails();
// only consider update this component if the user had permissions to it
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
affectedComponent.setState(runStatusDetailsDto.getRunStatus());
affectedComponent.setActiveThreadCount(runStatusDetailsDto.getActiveThreadCount());
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
affectedComponent.setValidationErrors(runStatusDetailsDto.getValidationErrors());
}
}
});
}
private boolean isProcessorActionComplete(final ProcessorsRunStatusDetailsEntity runStatusDetailsEntity, final Map<String, AffectedComponentEntity> affectedComponents,
final ScheduledState desiredState, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
updateAffectedProcessors(runStatusDetailsEntity.getRunStatusDetails(), affectedComponents);
boolean allReachedDesiredState = true;
for (final ProcessorRunStatusDetailsEntity entity : runStatusDetailsEntity.getRunStatusDetails()) {
final ProcessorRunStatusDetailsDTO runStatusDetailsDto = entity.getRunStatusDetails();
if (!affectedComponents.containsKey(runStatusDetailsDto.getId())) {
continue;
}
final boolean desiredStateReached = isDesiredProcessorStateReached(runStatusDetailsDto, desiredState);
logger.debug("Processor[id={}, name={}] now has a state of {} with {} Active Threads, Validation Errors: {}; desired state = {}; invalid component action: {}; desired state reached = {}",
runStatusDetailsDto.getId(), runStatusDetailsDto.getName(), runStatusDetailsDto.getRunStatus(), runStatusDetailsDto.getActiveThreadCount(), runStatusDetailsDto.getValidationErrors(),
desiredState, invalidComponentAction, desiredStateReached);
if (desiredStateReached) {
continue;
}
// If the desired state is stopped and there are active threads, return false. We don't consider the validation status in this case.
if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) {
return false;
}
if (ProcessorRunStatusDetailsDTO.INVALID.equalsIgnoreCase(runStatusDetailsDto.getRunStatus())) {
switch (invalidComponentAction) {
case WAIT:
break;
case SKIP:
continue;
case FAIL:
final String action = desiredState == ScheduledState.RUNNING ? "start" : "stop";
throw new LifecycleManagementException("Could not " + action + " " + runStatusDetailsDto.getName() + " because it is invalid");
}
}
allReachedDesiredState = false;
}
if (allReachedDesiredState) {
logger.debug("All {} Processors of interest now have the desired state of {}", runStatusDetailsEntity.getRunStatusDetails().size(), desiredState);
return true;
}
return false;
}
private boolean isDesiredProcessorStateReached(final ProcessorRunStatusDetailsDTO runStatusDetailsDto, final ScheduledState desiredState) {
final String runStatus = runStatusDetailsDto.getRunStatus();
final boolean stateMatches = desiredState.name().equalsIgnoreCase(runStatus);
if (!stateMatches) {
return false;
}
if (desiredState == ScheduledState.STOPPED && runStatusDetailsDto.getActiveThreadCount() != 0) {
return false;
}
return true;
}
@Override
public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final String groupId, final Set<AffectedComponentEntity> affectedServices,
final Set<AffectedComponentEntity> servicesRequiringDesiredState, final ControllerServiceState desiredState,
final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
final Set<String> affectedServiceIds = affectedServices.stream()
.map(ComponentEntity::getId)
.collect(Collectors.toSet());
final Set<String> idsOfServicesRequiringDesiredState = servicesRequiringDesiredState.stream()
.map(ComponentEntity::getId)
.collect(Collectors.toSet());
final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity();
activateServicesEntity.setComponents(serviceRevisionDtoMap);
activateServicesEntity.setId(groupId);
activateServicesEntity.setState(desiredState.name());
URI controllerServicesUri;
try {
controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// If enabling services, validation must complete first
if (desiredState == ControllerServiceState.ENABLED && !affectedServiceIds.isEmpty()) {
try {
waitForControllerServiceValidation(user, originalUri, groupId, affectedServiceIds, pause);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while waiting for Controller Services to complete validation");
}
}
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
try {
if (!affectedServiceIds.isEmpty()) {
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
}
final int disableServicesStatus = clusterResponse.getStatus();
if (disableServicesStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(clusterResponse, String.class);
throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
}
}
final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, idsOfServicesRequiringDesiredState, desiredState, pause, invalidComponentAction);
if (!serviceTransitioned) {
throw new LifecycleManagementException("Failed while waiting for Controller Services to finish transitioning to a state of " + desiredState);
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while transitioning Controller Services to a state of " + desiredState);
}
return affectedServices.stream()
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId()))
.map(dtoFactory::createAffectedComponentEntity)
.collect(Collectors.toSet());
}
private boolean waitForControllerServiceValidation(final NiFiUser user, final URI originalUri, final String groupId,
final Set<String> serviceIds, final Pause pause)
throws InterruptedException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
}
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class);
final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
final Map<String, AffectedComponentEntity> affectedServices = serviceEntities.stream()
.filter(s -> serviceIds.contains(s.getId()))
.collect(Collectors.toMap(ControllerServiceEntity::getId, dtoFactory::createAffectedComponentEntity));
if (isControllerServiceValidationComplete(serviceEntities, affectedServices)) {
logger.debug("All {} controller services of interest have completed validation", affectedServices.size());
return true;
}
continuePolling = pause.pause();
}
return false;
}
private boolean isControllerServiceValidationComplete(final Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
updateAffectedControllerServices(controllerServiceEntities, affectedComponents);
for (final ControllerServiceEntity entity : controllerServiceEntities) {
if (!affectedComponents.containsKey(entity.getId())) {
continue;
}
if (ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
return false;
}
}
return true;
}
/**
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
*
* @param user the user making the request
* @param groupId the ID of the Process Group to poll
* @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
* @param desiredState the desired state for all services with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
*/
private boolean waitForControllerServiceStatus(final NiFiUser user, final URI originalUri, final String groupId, final Set<String> serviceIds,
final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction)
throws InterruptedException, LifecycleManagementException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
boolean continuePolling = true;
while (continuePolling) {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
final NodeResponse clusterResponse;
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
}
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class);
final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
final Map<String, AffectedComponentEntity> affectedServices = serviceEntities.stream()
.collect(Collectors.toMap(ControllerServiceEntity::getId, dtoFactory::createAffectedComponentEntity));
// update the affected controller services
updateAffectedControllerServices(serviceEntities, affectedServices);
final String desiredStateName = desiredState.name();
boolean allReachedDesiredState = true;
for (final ControllerServiceEntity serviceEntity : serviceEntities) {
final ControllerServiceDTO serviceDto = serviceEntity.getComponent();
if (serviceDto == null) {
continue;
}
if (!serviceIds.contains(serviceDto.getId())) {
continue;
}
final String validationStatus = serviceDto.getValidationStatus();
final boolean desiredStateReached = desiredStateName.equals(serviceDto.getState());
logger.debug("ControllerService[id={}, name={}] now has a state of {} with a Validation Status of {}; desired state = {}; invalid component action is {}; desired state reached = {}",
serviceDto.getId(), serviceDto.getName(), serviceDto.getState(), validationStatus, desiredState, invalidComponentAction, desiredStateReached);
if (desiredStateReached) {
continue;
}
// The desired state for this component has not yet been reached. Check how we should handle this based on the validation status.
if (ControllerServiceDTO.INVALID.equalsIgnoreCase(validationStatus)) {
switch (invalidComponentAction) {
case WAIT:
break;
case SKIP:
continue;
case FAIL:
final String action = desiredState == ControllerServiceState.ENABLED ? "enable" : "disable";
throw new LifecycleManagementException("Could not " + action + " " + serviceEntity.getComponent().getName() + " because it is invalid");
}
}
allReachedDesiredState = false;
}
if (allReachedDesiredState) {
logger.debug("All {} controller services of interest now have the desired state of {}", affectedServices.size(), desiredState);
return true;
}
// Not all of the controller services are in the desired state. Pause for a bit and poll again.
continuePolling = pause.pause();
}
return false;
}
/**
* Updates the affected controller services in the specified updateRequest with the serviceEntities.
*
* @param serviceEntities service entities
* @param affectedServices affected services
*/
private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final Map<String, AffectedComponentEntity> affectedServices) {
// update the affected components
serviceEntities.stream()
.filter(entity -> affectedServices.containsKey(entity.getId()))
.forEach(entity -> {
final AffectedComponentEntity affectedComponentEntity = affectedServices.get(entity.getId());
affectedComponentEntity.setRevision(entity.getRevision());
// only consider update this component if the user had permissions to it
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
affectedComponent.setState(entity.getComponent().getState());
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
}
}
});
}
public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}
public void setRequestReplicator(final RequestReplicator requestReplicator) {
this.requestReplicator = requestReplicator;
}
public void setDtoFactory(final DtoFactory dtoFactory) {
this.dtoFactory = dtoFactory;
}
public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
}