blob: 41434d3948b273e21d151e1ca16a5175ae93b398 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.quartz.CronExpression;
public class StandardReportingTaskDAO extends ComponentDAO implements ReportingTaskDAO {
private ReportingTaskProvider reportingTaskProvider;
/**
* Locates the specified reporting task.
*
* @param reportingTaskId
* @return
*/
private ReportingTaskNode locateReportingTask(final String reportingTaskId) {
// get the reporting task
final ReportingTaskNode reportingTask = reportingTaskProvider.getReportingTaskNode(reportingTaskId);
// ensure the reporting task exists
if (reportingTask == null) {
throw new ResourceNotFoundException(String.format("Unable to locate reporting task with id '%s'.", reportingTaskId));
}
return reportingTask;
}
/**
* Creates a reporting task.
*
* @param reportingTaskDTO The reporting task DTO
* @return The reporting task
*/
@Override
public ReportingTaskNode createReportingTask(final ReportingTaskDTO reportingTaskDTO) {
try {
// create the reporting task
final ReportingTaskNode reportingTask = reportingTaskProvider.createReportingTask(reportingTaskDTO.getType(), reportingTaskDTO.getId(), true);
// ensure we can perform the update
verifyUpdate(reportingTask, reportingTaskDTO);
// perform the update
configureReportingTask(reportingTask, reportingTaskDTO);
return reportingTask;
} catch (ReportingTaskInstantiationException rtie) {
throw new NiFiCoreException(rtie.getMessage(), rtie);
}
}
/**
* Gets the specified reporting task.
*
* @param reportingTaskId The reporting task id
* @return The reporting task
*/
@Override
public ReportingTaskNode getReportingTask(final String reportingTaskId) {
return locateReportingTask(reportingTaskId);
}
/**
* Determines if the specified reporting task exists.
*
* @param reportingTaskId
* @return
*/
@Override
public boolean hasReportingTask(final String reportingTaskId) {
return reportingTaskProvider.getReportingTaskNode(reportingTaskId) != null;
}
/**
* Gets all of the reporting tasks.
*
* @return The reporting tasks
*/
@Override
public Set<ReportingTaskNode> getReportingTasks() {
return reportingTaskProvider.getAllReportingTasks();
}
/**
* Updates the specified reporting task.
*
* @param reportingTaskDTO The reporting task DTO
* @return The reporting task
*/
@Override
public ReportingTaskNode updateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
// get the reporting task
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskDTO.getId());
// ensure we can perform the update
verifyUpdate(reportingTask, reportingTaskDTO);
// perform the update
configureReportingTask(reportingTask, reportingTaskDTO);
// configure scheduled state
// see if an update is necessary
if (isNotNull(reportingTaskDTO.getState())) {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(reportingTaskDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(reportingTask.getScheduledState())) {
try {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
reportingTaskProvider.startReportingTask(reportingTask);
break;
case STOPPED:
switch (reportingTask.getScheduledState()) {
case RUNNING:
reportingTaskProvider.stopReportingTask(reportingTask);
break;
case DISABLED:
reportingTaskProvider.enableReportingTask(reportingTask);
break;
}
break;
case DISABLED:
reportingTaskProvider.disableReportingTask(reportingTask);
break;
}
} catch (IllegalStateException | ProcessorLifeCycleException ise) {
throw new NiFiCoreException(ise.getMessage(), ise);
} catch (RejectedExecutionException ree) {
throw new NiFiCoreException("Unable to schedule all tasks for the specified reporting task.", ree);
} catch (NullPointerException npe) {
throw new NiFiCoreException("Unable to update reporting task run state.", npe);
} catch (Exception e) {
throw new NiFiCoreException("Unable to update reporting task run state: " + e, e);
}
}
}
return reportingTask;
}
/**
* Validates the specified configuration for the specified reporting task.
*
* @param reportingTask
* @param reportingTaskDTO
* @return
*/
private List<String> validateProposedConfiguration(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
final List<String> validationErrors = new ArrayList<>();
// get the current scheduling strategy
SchedulingStrategy schedulingStrategy = reportingTask.getSchedulingStrategy();
// validate the new scheduling strategy if appropriate
if (isNotNull(reportingTaskDTO.getSchedulingStrategy())) {
try {
// this will be the new scheduling strategy so use it
schedulingStrategy = SchedulingStrategy.valueOf(reportingTaskDTO.getSchedulingStrategy());
} catch (IllegalArgumentException iae) {
validationErrors.add(String.format("Scheduling strategy: Value must be one of [%s]", StringUtils.join(SchedulingStrategy.values(), ", ")));
}
}
// validate the scheduling period based on the scheduling strategy
if (isNotNull(reportingTaskDTO.getSchedulingPeriod())) {
switch (schedulingStrategy) {
case TIMER_DRIVEN:
final Matcher schedulingMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(reportingTaskDTO.getSchedulingPeriod());
if (!schedulingMatcher.matches()) {
validationErrors.add("Scheduling period is not a valid time duration (ie 30 sec, 5 min)");
}
break;
case CRON_DRIVEN:
try {
new CronExpression(reportingTaskDTO.getSchedulingPeriod());
} catch (final ParseException pe) {
throw new IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid cron expression: %s", reportingTaskDTO.getSchedulingPeriod(), pe.getMessage()));
} catch (final Exception e) {
throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + reportingTaskDTO.getSchedulingPeriod());
}
break;
}
}
return validationErrors;
}
@Override
public void verifyDelete(final String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
reportingTask.verifyCanDelete();
}
@Override
public void verifyUpdate(final ReportingTaskDTO reportingTaskDTO) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskDTO.getId());
verifyUpdate(reportingTask, reportingTaskDTO);
}
/**
* Verifies the reporting task can be updated.
*
* @param reportingTask
* @param reportingTaskDTO
*/
private void verifyUpdate(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
// ensure the state, if specified, is valid
if (isNotNull(reportingTaskDTO.getState())) {
try {
final ScheduledState purposedScheduledState = ScheduledState.valueOf(reportingTaskDTO.getState());
// only attempt an action if it is changing
if (!purposedScheduledState.equals(reportingTask.getScheduledState())) {
// perform the appropriate action
switch (purposedScheduledState) {
case RUNNING:
reportingTask.verifyCanStart();
break;
case STOPPED:
switch (reportingTask.getScheduledState()) {
case RUNNING:
reportingTask.verifyCanStop();
break;
case DISABLED:
reportingTask.verifyCanEnable();
break;
}
break;
case DISABLED:
reportingTask.verifyCanDisable();
break;
}
}
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format(
"The specified reporting task state (%s) is not valid. Valid options are 'RUNNING', 'STOPPED', and 'DISABLED'.",
reportingTaskDTO.getState()));
}
}
boolean modificationRequest = false;
if (isAnyNotNull(reportingTaskDTO.getName(),
reportingTaskDTO.getSchedulingStrategy(),
reportingTaskDTO.getSchedulingPeriod(),
reportingTaskDTO.getAnnotationData(),
reportingTaskDTO.getProperties())) {
modificationRequest = true;
// validate the request
final List<String> requestValidation = validateProposedConfiguration(reportingTask, reportingTaskDTO);
// ensure there was no validation errors
if (!requestValidation.isEmpty()) {
throw new ValidationException(requestValidation);
}
}
if (modificationRequest) {
reportingTask.verifyCanUpdate();
}
}
/**
* Configures the specified reporting task.
*
* @param reportingTask
* @param reportingTaskDTO
*/
private void configureReportingTask(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
final String name = reportingTaskDTO.getName();
final String schedulingStrategy = reportingTaskDTO.getSchedulingStrategy();
final String schedulingPeriod = reportingTaskDTO.getSchedulingPeriod();
final String annotationData = reportingTaskDTO.getAnnotationData();
final Map<String, String> properties = reportingTaskDTO.getProperties();
// ensure scheduling strategy is set first
if (isNotNull(schedulingStrategy)) {
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
}
if (isNotNull(name)) {
reportingTask.setName(name);
}
if (isNotNull(schedulingPeriod)) {
reportingTask.setScheduldingPeriod(schedulingPeriod);
}
if (isNotNull(annotationData)) {
reportingTask.setAnnotationData(annotationData);
}
if (isNotNull(properties)) {
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propName = entry.getKey();
final String propVal = entry.getValue();
if (isNotNull(propName) && propVal == null) {
reportingTask.removeProperty(propName);
} else if (isNotNull(propName)) {
reportingTask.setProperty(propName, propVal);
}
}
}
}
/**
* Deletes the specified reporting task.
*
* @param reportingTaskId The reporting task id
*/
@Override
public void deleteReportingTask(String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
reportingTaskProvider.removeReportingTask(reportingTask);
}
/* setters */
public void setReportingTaskProvider(ReportingTaskProvider reportingTaskProvider) {
this.reportingTaskProvider = reportingTaskProvider;
}
}