NIFI-8939: Ensure that when async/long-running flow updates are made,… (#5240)
* NIFI-8939: Ensure that when async/long-running flow updates are made, referencing controller services that are disabling are waited on but not attempted to be disabled
* NIFI-8939: Ensure that when waiting for Controller Services to reach desired state, we use correct URI for fetch service state. There was a typo that resulted in not getting all controller services' states.
This closes #5240
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
index 63a38f5..becb69c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java
@@ -330,16 +330,27 @@
}
asyncRequest.markStepComplete();
- // Steps 7-8. Disable enabled controller services that are affected
- final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream()
- .filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
- .filter(dto -> "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
- .collect(Collectors.toSet());
+ // Steps 7-8. Disable enabled controller services that are affected.
+ // We don't want to disable services that are already disabling. But we need to wait for their state to transition from Disabling to Disabled.
+ final Set<AffectedComponentEntity> servicesToWaitFor = affectedComponents.stream()
+ .filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
+ .filter(dto -> {
+ final String state = dto.getComponent().getState();
+ return "Enabled".equalsIgnoreCase(state) || "Enabling".equalsIgnoreCase(state) || "Disabling".equalsIgnoreCase(state);
+ })
+ .collect(Collectors.toSet());
+
+ final Set<AffectedComponentEntity> enabledServices = servicesToWaitFor.stream()
+ .filter(dto -> {
+ final String state = dto.getComponent().getState();
+ return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state);
+ })
+ .collect(Collectors.toSet());
logger.info("Disabling {} Controller Services", enabledServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
- componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
+ componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, servicesToWaitFor, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;
@@ -413,7 +424,8 @@
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
try {
- componentLifecycle.activateControllerServices(requestUri, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
+ componentLifecycle.activateControllerServices(requestUri, groupId, servicesToEnable, servicesToEnable,
+ ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
index 64f4856..532c7bf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ParameterContextResource.java
@@ -851,10 +851,13 @@
.filter(component -> "Running".equalsIgnoreCase(component.getComponent().getState()))
.collect(Collectors.toSet());
- final Set<AffectedComponentEntity> enabledControllerServices = affectedComponents.stream()
+ final Set<AffectedComponentEntity> servicesRequiringDisabledState = affectedComponents.stream()
.filter(entity -> entity.getComponent() != null)
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
- .filter(dto -> "Enabling".equalsIgnoreCase(dto.getComponent().getState()) || "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
+ .filter(dto -> {
+ final String state = dto.getComponent().getState();
+ return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state) || "Disabling".equalsIgnoreCase(state);
+ })
.collect(Collectors.toSet());
stopProcessors(runningProcessors, asyncRequest, componentLifecycle, uri);
@@ -862,7 +865,16 @@
return null;
}
- disableControllerServices(enabledControllerServices, asyncRequest, componentLifecycle, uri);
+ // We want to disable only those Controller Services that are currently enabled or enabling, but we need to wait for
+ // services that are currently Disabling to become disabled before we are able to consider this step complete.
+ final Set<AffectedComponentEntity> enabledControllerServices = servicesRequiringDisabledState.stream()
+ .filter(dto -> {
+ final String state = dto.getComponent().getState();
+ return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state);
+ })
+ .collect(Collectors.toSet());
+
+ disableControllerServices(enabledControllerServices, servicesRequiringDisabledState, asyncRequest, componentLifecycle, uri);
if (asyncRequest.isCancelled()) {
return null;
}
@@ -878,7 +890,7 @@
} finally {
// TODO: can almost certainly be refactored so that the same code is shared between VersionsResource and ParameterContextResource.
if (!asyncRequest.isCancelled()) {
- enableControllerServices(enabledControllerServices, asyncRequest, componentLifecycle, uri);
+ enableControllerServices(enabledControllerServices, enabledControllerServices, asyncRequest, componentLifecycle, uri);
}
if (!asyncRequest.isCancelled()) {
@@ -993,17 +1005,19 @@
}
}
- private void disableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,
- final URI uri) throws LifecycleManagementException {
+ private void disableControllerServices(final Set<AffectedComponentEntity> enabledControllerServices, final Set<AffectedComponentEntity> controllerServicesRequiringDisabledState,
+ final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle, final URI uri) throws LifecycleManagementException {
asyncRequest.markStepComplete();
- logger.info("Disabling {} Controller Services in order to update Parameter Context", controllerServices.size());
+ logger.info("Disabling {} Controller Services in order to update Parameter Context", enabledControllerServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
- componentLifecycle.activateControllerServices(uri, "root", controllerServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.WAIT);
+ componentLifecycle.activateControllerServices(uri, "root", enabledControllerServices, controllerServicesRequiringDisabledState, ControllerServiceState.DISABLED, disableServicesPause,
+ InvalidComponentAction.WAIT);
}
- private void enableControllerServices(final Set<AffectedComponentEntity> controllerServices, final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,
+ private void enableControllerServices(final Set<AffectedComponentEntity> controllerServices, final Set<AffectedComponentEntity> controllerServicesRequiringDisabledState,
+ final AsynchronousWebRequest<?, ?> asyncRequest, final ComponentLifecycle componentLifecycle,
final URI uri) throws LifecycleManagementException, ResumeFlowException {
if (logger.isDebugEnabled()) {
logger.debug("Re-Enabling {} Controller Services: {}", controllerServices.size(), controllerServices);
@@ -1017,7 +1031,8 @@
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(controllerServices);
try {
- componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
+ componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, controllerServicesRequiringDisabledState,
+ ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
asyncRequest.markStepComplete();
} catch (final IllegalStateException ise) {
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java
index 583806e..ff7737a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardParameterContextDAO.java
@@ -166,9 +166,13 @@
verifyParameterUpdate(parameterDto, processor, currentContext.getName(), verifyComponentStates, processor.isRunning(), "Processor that is running");
}
- for (final ControllerServiceNode serviceNode : referenceManager.getControllerServicesReferencing(currentContext, parameterName)) {
- verifyParameterUpdate(parameterDto, serviceNode, currentContext.getName(), verifyComponentStates,
- serviceNode.getState() != ControllerServiceState.DISABLED, "Controller Service that is enabled");
+ final Set<ControllerServiceNode> referencingServices = referenceManager.getControllerServicesReferencing(currentContext, parameterName);
+ for (final ControllerServiceNode serviceNode : referencingServices) {
+ final ControllerServiceState serviceState = serviceNode.getState();
+ final boolean serviceActive = serviceState != ControllerServiceState.DISABLED;
+
+ verifyParameterUpdate(parameterDto, serviceNode, currentContext.getName(), verifyComponentStates, serviceActive,
+ "Controller Service [id=" + serviceNode.getIdentifier() + "] with a state of " + serviceState + " (state expected to be DISABLED)");
}
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
index 1dee522..783ce49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java
@@ -402,15 +402,20 @@
@Override
public Set<AffectedComponentEntity> activateControllerServices(final URI originalUri, final String groupId, final Set<AffectedComponentEntity> affectedServices,
- final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
+ final Set<AffectedComponentEntity> servicesRequiringDesiredState, final ControllerServiceState desiredState,
+ final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
final Set<String> affectedServiceIds = affectedServices.stream()
.map(ComponentEntity::getId)
.collect(Collectors.toSet());
+ final Set<String> idsOfServicesRequiringDesiredState = servicesRequiringDesiredState.stream()
+ .map(ComponentEntity::getId)
+ .collect(Collectors.toSet());
+
final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
- final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
+ final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity();
activateServicesEntity.setComponents(serviceRevisionDtoMap);
@@ -431,7 +436,7 @@
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// If enabling services, validation must complete first
- if (desiredState == ControllerServiceState.ENABLED) {
+ if (desiredState == ControllerServiceState.ENABLED && !affectedServiceIds.isEmpty()) {
try {
waitForControllerServiceValidation(user, originalUri, groupId, affectedServiceIds, pause);
} catch (InterruptedException e) {
@@ -442,21 +447,23 @@
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
try {
- final NodeResponse clusterResponse;
- if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
- clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
- } else {
- clusterResponse = getRequestReplicator().forwardToCoordinator(
- getClusterCoordinatorNode(), user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+ if (!affectedServiceIds.isEmpty()) {
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+ }
+
+ final int disableServicesStatus = clusterResponse.getStatus();
+ if (disableServicesStatus != Status.OK.getStatusCode()) {
+ final String explanation = getResponseEntity(clusterResponse, String.class);
+ throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
+ }
}
- final int disableServicesStatus = clusterResponse.getStatus();
- if (disableServicesStatus != Status.OK.getStatusCode()) {
- final String explanation = getResponseEntity(clusterResponse, String.class);
- throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
- }
-
- final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, affectedServiceIds, desiredState, pause, invalidComponentAction);
+ final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, idsOfServicesRequiringDesiredState, desiredState, pause, invalidComponentAction);
if (!serviceTransitioned) {
throw new LifecycleManagementException("Failed while waiting for Controller Services to finish transitioning to a state of " + desiredState);
@@ -551,7 +558,7 @@
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
- originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment());
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java
index dd4ed5b..3d299ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java
@@ -47,7 +47,8 @@
*
* @param exampleUri an URI to use as a base for the REST API
* @param groupId the ID of the process group
- * @param services the controller services to enable or disable
+ * @param servicesToUpdate the controller services to enable or disable
+ * @param servicesRequiringDesiredState the controller services whose state must be transitioned to the desired state before returning
* @param desiredState the desired state of the components
* @param pause a pause that can be used to determine how long to wait between polling for task completion and that can also be used to cancel the operation
* @param invalidComponentAction when waiting for a component to reach the specified desired state, indicates how the deal with a component that is invalid
@@ -56,6 +57,6 @@
*
* @throws IllegalStateException if any of the components given do not have a state that can be transitioned to the given desired state
*/
- Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, String groupId, Set<AffectedComponentEntity> services,
+ Set<AffectedComponentEntity> activateControllerServices(URI exampleUri, String groupId, Set<AffectedComponentEntity> servicesToUpdate, Set<AffectedComponentEntity> servicesRequiringDesiredState,
ControllerServiceState desiredState, Pause pause, InvalidComponentAction invalidComponentAction) throws LifecycleManagementException;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
index 370bc79..7cbaf27 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java
@@ -74,22 +74,26 @@
}
@Override
- public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> services,
- final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
+ public Set<AffectedComponentEntity> activateControllerServices(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> servicesToUpdate,
+ final Set<AffectedComponentEntity> servicesRequiringDesiredState, final ControllerServiceState desiredState,
+ final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
- final Map<String, Revision> serviceRevisions = services.stream()
+ final Map<String, Revision> serviceRevisions = servicesToUpdate.stream()
.collect(Collectors.toMap(AffectedComponentEntity::getId, entity -> revisionManager.getRevision(entity.getId())));
- final Map<String, AffectedComponentEntity> affectedServiceMap = services.stream()
+ final Map<String, AffectedComponentEntity> affectedServiceMap = servicesToUpdate.stream()
+ .collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity()));
+
+ final Map<String, AffectedComponentEntity> servicesToWaitFor = servicesRequiringDesiredState.stream()
.collect(Collectors.toMap(AffectedComponentEntity::getId, Function.identity()));
if (desiredState == ControllerServiceState.ENABLED) {
- enableControllerServices(groupId, serviceRevisions, affectedServiceMap, pause, invalidComponentAction);
+ enableControllerServices(groupId, serviceRevisions, affectedServiceMap, servicesToWaitFor, pause, invalidComponentAction);
} else {
- disableControllerServices(groupId, serviceRevisions, affectedServiceMap, pause, invalidComponentAction);
+ disableControllerServices(groupId, serviceRevisions, affectedServiceMap, servicesToWaitFor, pause, invalidComponentAction);
}
- return services.stream()
+ return servicesRequiringDesiredState.stream()
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId()))
.map(dtoFactory::createAffectedComponentEntity)
.collect(Collectors.toSet());
@@ -278,7 +282,8 @@
return true;
}
- private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause,
+ private void enableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices,
+ final Map<String, AffectedComponentEntity> servicesRequiringDesiredState, final Pause pause,
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
if (serviceRevisions.isEmpty()) {
@@ -291,21 +296,29 @@
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet());
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
- waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, invalidComponentAction);
+ waitForControllerServiceState(processGroupId, servicesRequiringDesiredState, ControllerServiceState.ENABLED, pause, invalidComponentAction);
}
- private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices, final Pause pause,
+ private void disableControllerServices(final String processGroupId, final Map<String, Revision> serviceRevisions, final Map<String, AffectedComponentEntity> affectedServices,
+ final Map<String, AffectedComponentEntity> servicesToWaitFor, final Pause pause,
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
- if (serviceRevisions.isEmpty()) {
+ if (serviceRevisions.isEmpty() && servicesToWaitFor.isEmpty()) {
+ logger.debug("No Controller Services to update or wait for state to become DISABLED");
return;
}
logger.debug("Disabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId);
- serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, affectedServices.keySet());
- serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
- waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, invalidComponentAction);
+ if (!affectedServices.isEmpty()) {
+ serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, affectedServices.keySet());
+ }
+
+ if (!serviceRevisions.isEmpty()) {
+ serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
+ }
+
+ waitForControllerServiceState(processGroupId, servicesToWaitFor, ControllerServiceState.DISABLED, pause, invalidComponentAction);
}
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 2f7cdc5..3361a0f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -465,7 +465,7 @@
}
}
- public ActivateControllerServicesEntity disableControllerServices(final String groupId) throws NiFiClientException, IOException {
+ public ActivateControllerServicesEntity disableControllerServices(final String groupId, final boolean recurse) throws NiFiClientException, IOException {
final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
activateControllerServicesEntity.setId(groupId);
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED);
@@ -473,6 +473,15 @@
final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
waitForControllerSerivcesDisabled(groupId);
+ if (recurse) {
+ final ProcessGroupFlowEntity groupEntity = nifiClient.getFlowClient().getProcessGroup(groupId);
+ final FlowDTO flowDto = groupEntity.getProcessGroupFlow().getFlow();
+ for (final ProcessGroupEntity childGroupEntity : flowDto.getProcessGroups()) {
+ final String childGroupId = childGroupEntity.getId();
+ disableControllerServices(childGroupId, recurse);
+ }
+ }
+
return activateControllerServices;
}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 08348eb..5be74fb 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -119,7 +119,7 @@
protected void destroyFlow() throws NiFiClientException, IOException {
getClientUtil().stopProcessGroupComponents("root");
- getClientUtil().disableControllerServices("root");
+ getClientUtil().disableControllerServices("root", true);
getClientUtil().stopTransmitting("root");
getClientUtil().deleteAll("root");
}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java
index e091d47..da35e8c 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java
@@ -426,7 +426,7 @@
final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", "root", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
// Set service's sleep time to the parameter.
- serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnablend Sleep Time", "#{sleep}"));
+ serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnEnabled Sleep Time", "#{sleep}"));
getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
// Enable the service. It should take 7 seconds for the service to fully enable.
@@ -443,6 +443,51 @@
}
@Test
+ public void testParamChangeWhileReferencingControllerServiceDisabling() throws NiFiClientException, IOException, InterruptedException {
+ testParamChangeWhileReferencingControllerServiceDisabling(true);
+ }
+
+ @Test
+ public void testParamChangeWhileReferencingControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
+ testParamChangeWhileReferencingControllerServiceDisabling(false);
+ }
+
+ private void testParamChangeWhileReferencingControllerServiceDisabling(final boolean disableServiceBeforeUpdate) throws NiFiClientException, IOException, InterruptedException {
+ final ParameterContextEntity createdContextEntity = createParameterContext("sleep", "7 sec");
+
+ // Set the Parameter Context on the root Process Group
+ final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root");
+ setParameterContext(childGroup.getId(), createdContextEntity);
+
+ final ControllerServiceEntity serviceEntity = createControllerService(TEST_CS_PACKAGE + ".StandardSleepService", childGroup.getId(),
+ NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
+
+ // Set service's sleep time to the parameter.
+ serviceEntity.getComponent().setProperties(Collections.singletonMap("@OnDisabled Sleep Time", "#{sleep}"));
+ getNifiClient().getControllerServicesClient().updateControllerService(serviceEntity);
+
+ // Enable the service.
+ getClientUtil().enableControllerService(serviceEntity);
+
+ // Wait for the service to reach of state of ENABLED.
+ getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "ENABLED", Collections.emptyList());
+
+ if (disableServiceBeforeUpdate) {
+ // Disable the service.
+ getClientUtil().disableControllerService(serviceEntity);
+
+ // Wait for service to reach state of DISABLING but not DISABLED. We want to change the parameter that it references while it's disabling.
+ getClientUtil().waitForControllerServiceState(serviceEntity.getParentGroupId(), "DISABLING", Collections.emptyList());
+ }
+
+ // Change the parameter
+ final ParameterContextUpdateRequestEntity paramUpdateRequestEntity = updateParameterContext(createdContextEntity, "sleep", "1 sec");
+
+ // Wait for the update to complete
+ getClientUtil().waitForParameterContextRequestToComplete(createdContextEntity.getId(), paramUpdateRequestEntity.getRequest().getRequestId());
+ }
+
+ @Test
public void testParamChangeWhileReferencingProcessorStartingButInvalid() throws NiFiClientException, IOException, InterruptedException {
final ParameterContextEntity contextEntity = createParameterContext("clone", "true");
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
index b0c4571..524f259 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
@@ -88,6 +88,7 @@
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
<logger name="org.apache.nifi.connectable.LocalPort" level="DEBUG"/>
+ <logger name="org.apache.nifi.web.util.ClusterReplicationComponentLifecycle" level="DEBUG" />
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
index 530cc19..7fe99b7 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
@@ -88,6 +88,7 @@
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
<logger name="org.apache.nifi.connectable.LocalPort" level="DEBUG"/>
+ <logger name="org.apache.nifi.web.util.ClusterReplicationComponentLifecycle" level="DEBUG" />
<logger name="org.apache.nifi.controller.StandardFlowSynchronizer" level="DEBUG" />
<logger name="org.apache.nifi.controller.inheritance" level="DEBUG" />