blob: 5de890e68bd165ab3d3ce9fed1f02f538b6d0657 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO {
private ControllerServiceProvider serviceProvider;
private ComponentStateDAO componentStateDAO;
private FlowController flowController;
private ControllerServiceNode locateControllerService(final String controllerServiceId) {
// get the controller service
final ControllerServiceNode controllerService = serviceProvider.getControllerServiceNode(controllerServiceId);
// ensure the controller service exists
if (controllerService == null) {
throw new ResourceNotFoundException(String.format("Unable to locate controller service with id '%s'.", controllerServiceId));
}
return controllerService;
}
@Override
public void verifyCreate(final ControllerServiceDTO controllerServiceDTO) {
verifyCreate(serviceProvider.getExtensionManager(), controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
}
@Override
public ControllerServiceNode createControllerService(final ControllerServiceDTO controllerServiceDTO) {
// ensure the type is specified
if (controllerServiceDTO.getType() == null) {
throw new IllegalArgumentException("The controller service type must be specified.");
}
try {
// create the controller service
final ExtensionManager extensionManager = serviceProvider.getExtensionManager();
final FlowManager flowManager = flowController.getFlowManager();
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
final ControllerServiceNode controllerService = flowManager.createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate,
Collections.emptySet(), true, true);
// ensure we can perform the update
verifyUpdate(controllerService, controllerServiceDTO);
// perform the update
configureControllerService(controllerService, controllerServiceDTO);
final String groupId = controllerServiceDTO.getParentGroupId();
if (groupId == null) {
flowManager.addRootControllerService(controllerService);
} else {
final ProcessGroup group;
if (groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS)) {
group = flowManager.getRootGroup();
} else {
group = flowManager.getRootGroup().findProcessGroup(groupId);
}
if (group == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
group.addControllerService(controllerService);
}
return controllerService;
} catch (final ControllerServiceInstantiationException csie) {
throw new NiFiCoreException(csie.getMessage(), csie);
}
}
@Override
public ControllerServiceNode getControllerService(final String controllerServiceId) {
return locateControllerService(controllerServiceId);
}
@Override
public boolean hasControllerService(final String controllerServiceId) {
return serviceProvider.getControllerServiceNode(controllerServiceId) != null;
}
@Override
public Set<ControllerServiceNode> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
final FlowManager flowManager = flowController.getFlowManager();
if (groupId == null) {
return flowManager.getRootControllerServices();
} else {
final String searchId = groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS) ? flowManager.getRootGroupId() : groupId;
final ProcessGroup procGroup = flowManager.getRootGroup().findProcessGroup(searchId);
if (procGroup == null) {
throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
}
final Set<ControllerServiceNode> serviceNodes = procGroup.getControllerServices(includeAncestorGroups);
if (includeDescendantGroups) {
serviceNodes.addAll(procGroup.findAllControllerServices());
}
return serviceNodes;
}
}
@Override
public ControllerServiceNode updateControllerService(final ControllerServiceDTO controllerServiceDTO) {
// get the controller service
final ControllerServiceNode controllerService = locateControllerService(controllerServiceDTO.getId());
// ensure we can perform the update
verifyUpdate(controllerService, controllerServiceDTO);
// perform the update
configureControllerService(controllerService, controllerServiceDTO);
// attempt to change the underlying controller service if an updated bundle is specified
// updating the bundle must happen after configuring so that any additional classpath resources are set first
updateBundle(controllerService, controllerServiceDTO);
// enable or disable as appropriate
if (isNotNull(controllerServiceDTO.getState())) {
final ControllerServiceState purposedControllerServiceState = ControllerServiceState.valueOf(controllerServiceDTO.getState());
// only attempt an action if it is changing
if (!purposedControllerServiceState.equals(controllerService.getState())) {
if (ControllerServiceState.ENABLED.equals(purposedControllerServiceState)) {
serviceProvider.enableControllerService(controllerService);
} else if (ControllerServiceState.DISABLED.equals(purposedControllerServiceState)) {
serviceProvider.disableControllerService(controllerService);
}
}
}
final ProcessGroup group = controllerService.getProcessGroup();
if (group != null) {
group.onComponentModified();
// For any component that references this Controller Service, find the component's Process Group
// and notify the Process Group that a component has been modified. This way, we know to re-calculate
// whether or not the Process Group has local modifications.
controllerService.getReferences().getReferencingComponents().stream()
.map(ComponentNode::getProcessGroupIdentifier)
.filter(id -> !id.equals(group.getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = group.findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
}
return controllerService;
}
private void updateBundle(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
final BundleDTO bundleDTO = controllerServiceDTO.getBundle();
if (bundleDTO != null) {
final ExtensionManager extensionManager = serviceProvider.getExtensionManager();
final BundleCoordinate incomingCoordinate = BundleUtils.getBundle(extensionManager, controllerService.getCanonicalClassName(), bundleDTO);
final BundleCoordinate existingCoordinate = controllerService.getBundleCoordinate();
if (!existingCoordinate.getCoordinate().equals(incomingCoordinate.getCoordinate())) {
try {
// we need to use the property descriptors from the temp component here in case we are changing from a ghost component to a real component
final ConfigurableComponent tempComponent = extensionManager.getTempComponent(controllerService.getCanonicalClassName(), incomingCoordinate);
final Set<URL> additionalUrls = controllerService.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
flowController.getReloadComponent().reload(controllerService, controllerService.getCanonicalClassName(), incomingCoordinate, additionalUrls);
} catch (ControllerServiceInstantiationException e) {
throw new NiFiCoreException(String.format("Unable to update controller service %s from %s to %s due to: %s",
controllerServiceDTO.getId(), controllerService.getBundleCoordinate().getCoordinate(), incomingCoordinate.getCoordinate(), e.getMessage()), e);
}
}
}
}
@Override
public Set<ComponentNode> updateControllerServiceReferencingComponents(
final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
// get the controller service
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
// this request is either acting upon referencing services or schedulable components
if (controllerServiceState != null) {
if (ControllerServiceState.ENABLED.equals(controllerServiceState)) {
return serviceProvider.enableReferencingServices(controllerService);
} else {
return serviceProvider.disableReferencingServices(controllerService);
}
} else if (scheduledState != null) {
if (ScheduledState.RUNNING.equals(scheduledState)) {
return serviceProvider.scheduleReferencingComponents(controllerService);
} else {
return serviceProvider.unscheduleReferencingComponents(controllerService);
}
}
return Collections.emptySet();
}
private List<String> validateProposedConfiguration(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
final List<String> validationErrors = new ArrayList<>();
final Map<String, String> properties = controllerServiceDTO.getProperties();
if (isNotNull(properties)) {
try {
controllerService.verifyCanUpdateProperties(properties);
} catch (final IllegalArgumentException | IllegalStateException iae) {
validationErrors.add(iae.getMessage());
}
}
return validationErrors;
}
@Override
public void verifyDelete(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
controllerService.verifyCanDelete();
}
@Override
public void verifyUpdate(final ControllerServiceDTO controllerServiceDTO) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceDTO.getId());
verifyUpdate(controllerService, controllerServiceDTO);
}
@Override
public void verifyUpdateReferencingComponents(final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
if (controllerServiceState != null) {
if (ControllerServiceState.ENABLED.equals(controllerServiceState)) {
serviceProvider.verifyCanEnableReferencingServices(controllerService);
} else {
serviceProvider.verifyCanDisableReferencingServices(controllerService);
}
} else if (scheduledState != null) {
if (ScheduledState.RUNNING.equals(scheduledState)) {
serviceProvider.verifyCanScheduleReferencingComponents(controllerService);
} else {
serviceProvider.verifyCanStopReferencingComponents(controllerService);
}
}
}
private void verifyUpdate(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
// validate the new controller service state if appropriate
if (isNotNull(controllerServiceDTO.getState())) {
try {
// attempt to parse the service state
final ControllerServiceState purposedControllerServiceState = ControllerServiceState.valueOf(controllerServiceDTO.getState());
// ensure the state is valid
if (ControllerServiceState.ENABLING.equals(purposedControllerServiceState) || ControllerServiceState.DISABLING.equals(purposedControllerServiceState)) {
throw new IllegalArgumentException();
}
// only attempt an action if it is changing
if (!purposedControllerServiceState.equals(controllerService.getState())) {
if (ControllerServiceState.ENABLED.equals(purposedControllerServiceState)) {
controllerService.verifyCanEnable();
} else if (ControllerServiceState.DISABLED.equals(purposedControllerServiceState)) {
controllerService.verifyCanDisable();
}
}
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException("Controller Service state: Value must be one of [ENABLED, DISABLED]");
}
}
boolean modificationRequest = false;
if (isAnyNotNull(controllerServiceDTO.getName(),
controllerServiceDTO.getAnnotationData(),
controllerServiceDTO.getComments(),
controllerServiceDTO.getProperties(),
controllerServiceDTO.getBundle())) {
modificationRequest = true;
// validate the request
final List<String> requestValidation = validateProposedConfiguration(controllerService, controllerServiceDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
}
final BundleDTO bundleDTO = controllerServiceDTO.getBundle();
if (bundleDTO != null) {
// ensures all nodes in a cluster have the bundle, throws exception if bundle not found for the given type
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(serviceProvider.getExtensionManager(), controllerService.getCanonicalClassName(), bundleDTO);
// ensure we are only changing to a bundle with the same group and id, but different version
controllerService.verifyCanUpdateBundle(bundleCoordinate);
}
if (modificationRequest) {
controllerService.verifyCanUpdate();
}
}
private void configureControllerService(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) {
final String name = controllerServiceDTO.getName();
final String annotationData = controllerServiceDTO.getAnnotationData();
final String comments = controllerServiceDTO.getComments();
final Map<String, String> properties = controllerServiceDTO.getProperties();
controllerService.pauseValidationTrigger(); // avoid causing validation to be triggered multiple times
try {
if (isNotNull(name)) {
controllerService.setName(name);
}
if (isNotNull(annotationData)) {
controllerService.setAnnotationData(annotationData);
}
if (isNotNull(comments)) {
controllerService.setComments(comments);
}
if (isNotNull(properties)) {
controllerService.setProperties(properties);
}
} finally {
controllerService.resumeValidationTrigger();
}
}
@Override
public void deleteControllerService(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
serviceProvider.removeControllerService(controllerService);
}
@Override
public StateMap getState(final String controllerServiceId, final Scope scope) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
return componentStateDAO.getState(controllerService, scope);
}
@Override
public void verifyClearState(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
controllerService.verifyCanClearState();
}
@Override
public void verifyConfigVerification(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
controllerService.verifyCanPerformVerification();
}
@Override
public List<ConfigVerificationResultDTO> verifyConfiguration(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
final ControllerServiceNode serviceNode = locateControllerService(controllerServiceId);
final LogRepository logRepository = new NopLogRepository();
final ComponentLog configVerificationLog = new SimpleProcessLogger(serviceNode.getControllerServiceImplementation(), logRepository);
final ExtensionManager extensionManager = flowController.getExtensionManager();
final ParameterLookup parameterLookup = serviceNode.getProcessGroup() == null ? ParameterLookup.EMPTY : serviceNode.getProcessGroup().getParameterContext();
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, properties, serviceNode.getAnnotationData(),
parameterLookup, flowController.getControllerServiceProvider(), null, flowController.getVariableRegistry());
final List<ConfigVerificationResult> verificationResults = serviceNode.verifyConfiguration(configurationContext, configVerificationLog, variables, extensionManager);
final List<ConfigVerificationResultDTO> resultsDtos = verificationResults.stream()
.map(this::createConfigVerificationResultDto)
.collect(Collectors.toList());
return resultsDtos;
}
private ConfigVerificationResultDTO createConfigVerificationResultDto(final ConfigVerificationResult result) {
final ConfigVerificationResultDTO dto = new ConfigVerificationResultDTO();
dto.setExplanation(result.getExplanation());
dto.setOutcome(result.getOutcome().name());
dto.setVerificationStepName(result.getVerificationStepName());
return dto;
}
@Override
public void clearState(final String controllerServiceId) {
final ControllerServiceNode controllerService = locateControllerService(controllerServiceId);
componentStateDAO.clearState(controllerService);
}
/* setters */
public void setServiceProvider(final ControllerServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
}
public void setComponentStateDAO(final ComponentStateDAO componentStateDAO) {
this.componentStateDAO = componentStateDAO;
}
public void setFlowController(final FlowController flowController) {
this.flowController = flowController;
}
}