blob: b7558fdf7aff447ee8c8daf733996762b6904877 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.repository.NopLogRepository;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.quartz.CronExpression;
import java.net.URL;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
public class StandardReportingTaskDAO extends ComponentDAO implements ReportingTaskDAO {
private ReportingTaskProvider reportingTaskProvider;
private ComponentStateDAO componentStateDAO;
private ReloadComponent reloadComponent;
private FlowController flowController;
private ReportingTaskNode locateReportingTask(final String reportingTaskId) {
// get the reporting task
final ReportingTaskNode reportingTask = reportingTaskProvider.getReportingTaskNode(reportingTaskId);
// ensure the reporting task exists
if (reportingTask == null) {
throw new ResourceNotFoundException(String.format("Unable to locate reporting task with id '%s'.", reportingTaskId));
}
return reportingTask;
}
@Override
public void verifyCreate(final ReportingTaskDTO reportingTaskDTO) {
verifyCreate(reportingTaskProvider.getExtensionManager(), reportingTaskDTO.getType(), reportingTaskDTO.getBundle());
}
@Override
public ReportingTaskNode createReportingTask(final ReportingTaskDTO reportingTaskDTO) {
// ensure the type is specified
if (reportingTaskDTO.getType() == null) {
throw new IllegalArgumentException("The reporting task type must be specified.");
}
try {
// create the reporting task
final ExtensionManager extensionManager = reportingTaskProvider.getExtensionManager();
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, reportingTaskDTO.getType(), reportingTaskDTO.getBundle());
final ReportingTaskNode reportingTask = reportingTaskProvider.createReportingTask(
reportingTaskDTO.getType(), reportingTaskDTO.getId(), bundleCoordinate, true);
// ensure we can perform the update
verifyUpdate(reportingTask, reportingTaskDTO);
// perform the update
configureReportingTask(reportingTask, reportingTaskDTO);
return reportingTask;
} catch (ReportingTaskInstantiationException rtie) {
throw new NiFiCoreException(rtie.getMessage(), rtie);
}
}
@Override
public ReportingTaskNode getReportingTask(final String reportingTaskId) {
return locateReportingTask(reportingTaskId);
}
@Override
public boolean hasReportingTask(final String reportingTaskId) {
return reportingTaskProvider.getReportingTaskNode(reportingTaskId) != null;
}
@Override
public Set<ReportingTaskNode> getReportingTasks() {
return reportingTaskProvider.getAllReportingTasks();
}
@Override
public ReportingTaskNode updateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
// get the reporting task
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskDTO.getId());
// ensure we can perform the update
verifyUpdate(reportingTask, reportingTaskDTO);
// perform the update
configureReportingTask(reportingTask, reportingTaskDTO);
// attempt to change the underlying processor if an updated bundle is specified
// updating the bundle must happen after configuring so that any additional classpath resources are set first
updateBundle(reportingTask, reportingTaskDTO);
// configure scheduled state
// see if an update is necessary
if (isNotNull(reportingTaskDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(reportingTaskDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(reportingTask.getScheduledState())) {
try {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
reportingTaskProvider.startReportingTask(reportingTask);
break;
case STOPPED:
switch (reportingTask.getScheduledState()) {
case RUNNING:
reportingTaskProvider.stopReportingTask(reportingTask);
break;
case DISABLED:
reportingTaskProvider.enableReportingTask(reportingTask);
break;
}
break;
case DISABLED:
reportingTaskProvider.disableReportingTask(reportingTask);
break;
}
} catch (IllegalStateException | ComponentLifeCycleException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
} catch (RejectedExecutionException ree) {
throw new NiFiCoreException("Unable to schedule all tasks for the specified reporting task.", ree);
} catch (NullPointerException npe) {
throw new NiFiCoreException("Unable to update reporting task run state.", npe);
} catch (Exception e) {
throw new NiFiCoreException("Unable to update reporting task run state: " + e, e);
}
}
}
return reportingTask;
}
private void updateBundle(ReportingTaskNode reportingTask, ReportingTaskDTO reportingTaskDTO) {
final BundleDTO bundleDTO = reportingTaskDTO.getBundle();
if (bundleDTO != null) {
final ExtensionManager extensionManager = reportingTaskProvider.getExtensionManager();
final BundleCoordinate incomingCoordinate = BundleUtils.getBundle(extensionManager, reportingTask.getCanonicalClassName(), bundleDTO);
final BundleCoordinate existingCoordinate = reportingTask.getBundleCoordinate();
if (!existingCoordinate.getCoordinate().equals(incomingCoordinate.getCoordinate())) {
try {
// we need to use the property descriptors from the temp component here in case we are changing from a ghost component to a real component
final ConfigurableComponent tempComponent = extensionManager.getTempComponent(reportingTask.getCanonicalClassName(), incomingCoordinate);
final Set<URL> additionalUrls = reportingTask.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
reloadComponent.reload(reportingTask, reportingTask.getCanonicalClassName(), incomingCoordinate, additionalUrls);
} catch (ReportingTaskInstantiationException e) {
throw new NiFiCoreException(String.format("Unable to update reporting task %s from %s to %s due to: %s",
reportingTaskDTO.getId(), reportingTask.getBundleCoordinate().getCoordinate(), incomingCoordinate.getCoordinate(), e.getMessage()), e);
}
}
}
}
private List<String> validateProposedConfiguration(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
final List<String> validationErrors = new ArrayList<>();
// get the current scheduling strategy
SchedulingStrategy schedulingStrategy = reportingTask.getSchedulingStrategy();
// validate the new scheduling strategy if appropriate
if (isNotNull(reportingTaskDTO.getSchedulingStrategy())) {
try {
// this will be the new scheduling strategy so use it
schedulingStrategy = SchedulingStrategy.valueOf(reportingTaskDTO.getSchedulingStrategy());
} catch (IllegalArgumentException iae) {
validationErrors.add(String.format("Scheduling strategy: Value must be one of [%s]", StringUtils.join(SchedulingStrategy.values(), ", ")));
}
}
// validate the scheduling period based on the scheduling strategy
if (isNotNull(reportingTaskDTO.getSchedulingPeriod())) {
switch (schedulingStrategy) {
case TIMER_DRIVEN:
final Matcher schedulingMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(reportingTaskDTO.getSchedulingPeriod());
if (!schedulingMatcher.matches()) {
validationErrors.add("Scheduling period is not a valid time duration (ie 30 sec, 5 min)");
}
break;
case CRON_DRIVEN:
try {
new CronExpression(reportingTaskDTO.getSchedulingPeriod());
} catch (final ParseException pe) {
throw new IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid cron expression: %s", reportingTaskDTO.getSchedulingPeriod(), pe.getMessage()));
} catch (final Exception e) {
throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + reportingTaskDTO.getSchedulingPeriod());
}
break;
}
}
return validationErrors;
}
@Override
public void verifyDelete(final String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
reportingTask.verifyCanDelete();
}
@Override
public void verifyUpdate(final ReportingTaskDTO reportingTaskDTO) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskDTO.getId());
verifyUpdate(reportingTask, reportingTaskDTO);
}
@Override
public void verifyConfigVerification(final String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
reportingTask.verifyCanPerformVerification();
}
@Override
public List<ConfigVerificationResultDTO> verifyConfiguration(final String reportingTaskId, final ReportingTaskDTO reportingTask) {
final ReportingTaskNode taskNode = locateReportingTask(reportingTaskId);
final LogRepository logRepository = new NopLogRepository();
final ComponentLog configVerificationLog = new SimpleProcessLogger(taskNode.getReportingTask(), logRepository);
final ExtensionManager extensionManager = flowController.getExtensionManager();
final ParameterLookup parameterLookup = ParameterLookup.EMPTY;
final ConfigurationContext configurationContext = new StandardConfigurationContext(taskNode, reportingTask.getProperties(), reportingTask.getAnnotationData(),
parameterLookup, flowController.getControllerServiceProvider(), null, flowController.getVariableRegistry());
final List<ConfigVerificationResult> verificationResults = taskNode.verifyConfiguration(configurationContext, configVerificationLog, extensionManager);
final List<ConfigVerificationResultDTO> resultsDtos = verificationResults.stream()
.map(this::createConfigVerificationResultDto)
.collect(Collectors.toList());
return resultsDtos;
}
private ConfigVerificationResultDTO createConfigVerificationResultDto(final ConfigVerificationResult result) {
final ConfigVerificationResultDTO dto = new ConfigVerificationResultDTO();
dto.setExplanation(result.getExplanation());
dto.setOutcome(result.getOutcome().name());
dto.setVerificationStepName(result.getVerificationStepName());
return dto;
}
private void verifyUpdate(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
// ensure the state, if specified, is valid
if (isNotNull(reportingTaskDTO.getState())) {
try {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(reportingTaskDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(reportingTask.getScheduledState())) {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
reportingTask.verifyCanStart();
break;
case STOPPED:
switch (reportingTask.getScheduledState()) {
case RUNNING:
reportingTask.verifyCanStop();
break;
case DISABLED:
reportingTask.verifyCanEnable();
break;
}
break;
case DISABLED:
reportingTask.verifyCanDisable();
break;
}
}
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format(
"The specified reporting task state (%s) is not valid. Valid options are 'RUNNING', 'STOPPED', and 'DISABLED'.",
reportingTaskDTO.getState()));
}
}
boolean modificationRequest = false;
if (isAnyNotNull(reportingTaskDTO.getName(),
reportingTaskDTO.getSchedulingStrategy(),
reportingTaskDTO.getSchedulingPeriod(),
reportingTaskDTO.getAnnotationData(),
reportingTaskDTO.getProperties(),
reportingTaskDTO.getBundle())) {
modificationRequest = true;
// validate the request
final List<String> requestValidation = validateProposedConfiguration(reportingTask, reportingTaskDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
}
final BundleDTO bundleDTO = reportingTaskDTO.getBundle();
if (bundleDTO != null) {
// ensures all nodes in a cluster have the bundle, throws exception if bundle not found for the given type
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(
reportingTaskProvider.getExtensionManager(), reportingTask.getCanonicalClassName(), bundleDTO);
// ensure we are only changing to a bundle with the same group and id, but different version
reportingTask.verifyCanUpdateBundle(bundleCoordinate);
}
if (modificationRequest) {
reportingTask.verifyCanUpdate();
}
}
private void configureReportingTask(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
final String name = reportingTaskDTO.getName();
final String schedulingStrategy = reportingTaskDTO.getSchedulingStrategy();
final String schedulingPeriod = reportingTaskDTO.getSchedulingPeriod();
final String annotationData = reportingTaskDTO.getAnnotationData();
final String comments = reportingTaskDTO.getComments();
final Map<String, String> properties = reportingTaskDTO.getProperties();
reportingTask.pauseValidationTrigger(); // avoid triggering validation multiple times
try {
// ensure scheduling strategy is set first
if (isNotNull(schedulingStrategy)) {
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
}
if (isNotNull(name)) {
reportingTask.setName(name);
}
if (isNotNull(schedulingPeriod)) {
reportingTask.setSchedulingPeriod(schedulingPeriod);
}
if (isNotNull(annotationData)) {
reportingTask.setAnnotationData(annotationData);
}
if (isNotNull(comments)) {
reportingTask.setComments(comments);
}
if (isNotNull(properties)) {
reportingTask.setProperties(properties);
}
} finally {
reportingTask.resumeValidationTrigger();
}
}
@Override
public void deleteReportingTask(String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
reportingTaskProvider.removeReportingTask(reportingTask);
}
@Override
public StateMap getState(String reportingTaskId, Scope scope) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
return componentStateDAO.getState(reportingTask, scope);
}
@Override
public void verifyClearState(String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
reportingTask.verifyCanClearState();
}
@Override
public void clearState(String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
componentStateDAO.clearState(reportingTask);
}
/* setters */
public void setReportingTaskProvider(ReportingTaskProvider reportingTaskProvider) {
this.reportingTaskProvider = reportingTaskProvider;
}
public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
this.componentStateDAO = componentStateDAO;
}
public void setReloadComponent(ReloadComponent reloadComponent) {
this.reloadComponent = reloadComponent;
}
public void setFlowController(FlowController flowController) {
this.flowController = flowController;
}
}