| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller.reporting; |
| |
| import org.apache.nifi.annotation.configuration.DefaultSchedule; |
| import org.apache.nifi.bundle.Bundle; |
| import org.apache.nifi.bundle.BundleCoordinate; |
| import org.apache.nifi.components.ConfigVerificationResult; |
| import org.apache.nifi.components.ConfigVerificationResult.Outcome; |
| import org.apache.nifi.components.ConfigurableComponent; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.components.validation.ValidationStatus; |
| import org.apache.nifi.components.validation.ValidationTrigger; |
| import org.apache.nifi.controller.AbstractComponentNode; |
| import org.apache.nifi.controller.ConfigurationContext; |
| import org.apache.nifi.controller.ControllerServiceLookup; |
| import org.apache.nifi.controller.LoggableComponent; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.ReloadComponent; |
| import org.apache.nifi.controller.ReportingTaskNode; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.TerminationAwareLogger; |
| import org.apache.nifi.controller.ValidationContextFactory; |
| import org.apache.nifi.controller.service.ControllerServiceNode; |
| import org.apache.nifi.controller.service.ControllerServiceProvider; |
| import org.apache.nifi.controller.service.StandardConfigurationContext; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.nar.ExtensionManager; |
| import org.apache.nifi.nar.InstanceClassLoader; |
| import org.apache.nifi.nar.NarCloseable; |
| import org.apache.nifi.parameter.ParameterLookup; |
| import org.apache.nifi.registry.ComponentVariableRegistry; |
| import org.apache.nifi.reporting.ReportingTask; |
| import org.apache.nifi.reporting.VerifiableReportingTask; |
| import org.apache.nifi.scheduling.SchedulingStrategy; |
| import org.apache.nifi.util.CharacterFilterUtils; |
| import org.apache.nifi.util.FormatUtils; |
| import org.apache.nifi.util.file.classloader.ClassLoaderUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| public abstract class AbstractReportingTaskNode extends AbstractComponentNode implements ReportingTaskNode { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class); |
| |
| private final AtomicReference<ReportingTaskDetails> reportingTaskRef; |
| private final ProcessScheduler processScheduler; |
| private final ControllerServiceLookup serviceLookup; |
| |
| private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN); |
| private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins"); |
| |
| private volatile String comment; |
| private volatile ScheduledState scheduledState = ScheduledState.STOPPED; |
| |
| public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, |
| final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, |
| final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry, |
| final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) { |
| |
| this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, |
| reportingTask.getComponent().getClass().getSimpleName(), reportingTask.getComponent().getClass().getCanonicalName(), |
| variableRegistry, reloadComponent, extensionManager, validationTrigger, false); |
| } |
| |
| |
| public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, |
| final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, |
| final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, |
| final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, |
| final boolean isExtensionMissing) { |
| |
| super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, |
| extensionManager, validationTrigger, isExtensionMissing); |
| this.reportingTaskRef = new AtomicReference<>(new ReportingTaskDetails(reportingTask)); |
| this.processScheduler = processScheduler; |
| this.serviceLookup = controllerServiceProvider; |
| |
| final Class<?> reportingClass = reportingTask.getComponent().getClass(); |
| |
| final DefaultSchedule dsc = reportingClass.getAnnotation(DefaultSchedule.class); |
| if(dsc != null) { |
| try { |
| this.setSchedulingStrategy(dsc.strategy()); |
| } catch (Throwable ex) { |
| LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex); |
| } |
| try { |
| this.setSchedulingPeriod(dsc.period()); |
| } catch (Throwable ex) { |
| this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN); |
| LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex); |
| } |
| } |
| } |
| |
| @Override |
| public ConfigurableComponent getComponent() { |
| return reportingTaskRef.get().getReportingTask(); |
| } |
| |
| @Override |
| public BundleCoordinate getBundleCoordinate() { |
| return reportingTaskRef.get().getBundleCoordinate(); |
| } |
| |
| @Override |
| public TerminationAwareLogger getLogger() { |
| return reportingTaskRef.get().getComponentLog(); |
| } |
| |
| @Override |
| public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) { |
| this.schedulingStrategy.set(schedulingStrategy); |
| } |
| |
| @Override |
| public SchedulingStrategy getSchedulingStrategy() { |
| return schedulingStrategy.get(); |
| } |
| |
| @Override |
| public String getSchedulingPeriod() { |
| return schedulingPeriod.get(); |
| } |
| |
| @Override |
| public long getSchedulingPeriod(final TimeUnit timeUnit) { |
| return FormatUtils.getTimeDuration(schedulingPeriod.get(), timeUnit); |
| } |
| |
| @Override |
| public void setSchedulingPeriod(final String schedulingPeriod) { |
| this.schedulingPeriod.set(schedulingPeriod); |
| } |
| |
| @Override |
| public ReportingTask getReportingTask() { |
| return reportingTaskRef.get().getReportingTask(); |
| } |
| |
| @Override |
| public void setReportingTask(final LoggableComponent<ReportingTask> reportingTask) { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Reporting Task configuration while Reporting Task is running"); |
| } |
| this.reportingTaskRef.set(new ReportingTaskDetails(reportingTask)); |
| } |
| |
| @Override |
| public void reload(final Set<URL> additionalUrls) throws ReportingTaskInstantiationException { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot reload Reporting Task while Reporting Task is running"); |
| } |
| String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls); |
| setAdditionalResourcesFingerprint(additionalResourcesFingerprint); |
| getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls); |
| } |
| |
| @Override |
| public boolean isRunning() { |
| return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0; |
| } |
| |
| @Override |
| public boolean isValidationNecessary() { |
| return !processScheduler.isScheduled(this) || getValidationStatus() != ValidationStatus.VALID; |
| } |
| |
| @Override |
| public int getActiveThreadCount() { |
| return processScheduler.getActiveThreadCount(this); |
| } |
| |
| @Override |
| public ConfigurationContext getConfigurationContext() { |
| return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), getVariableRegistry()); |
| } |
| |
| @Override |
| public void verifyModifiable() throws IllegalStateException { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot modify Reporting Task while the Reporting Task is running"); |
| } |
| } |
| |
| @Override |
| public ScheduledState getScheduledState() { |
| return scheduledState; |
| } |
| |
| @Override |
| public void setScheduledState(final ScheduledState state) { |
| this.scheduledState = state; |
| } |
| |
| public boolean isDisabled() { |
| return scheduledState == ScheduledState.DISABLED; |
| } |
| |
| @Override |
| public String getComments() { |
| return comment; |
| } |
| |
| @Override |
| public void setComments(final String comment) { |
| this.comment = CharacterFilterUtils.filterInvalidXmlCharacters(comment); |
| } |
| |
| @Override |
| public void verifyCanDelete() { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot delete " + getReportingTask().getIdentifier() + " because it is currently running"); |
| } |
| } |
| |
| @Override |
| public void verifyCanDisable() { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot disable " + getReportingTask().getIdentifier() + " because it is currently running"); |
| } |
| |
| if (isDisabled()) { |
| throw new IllegalStateException("Cannot disable " + getReportingTask().getIdentifier() + " because it is already disabled"); |
| } |
| } |
| |
| @Override |
| public void verifyCanEnable() { |
| if (!isDisabled()) { |
| throw new IllegalStateException("Cannot enable " + getReportingTask().getIdentifier() + " because it is not disabled"); |
| } |
| } |
| |
| @Override |
| public void verifyCanStart() { |
| if (isDisabled()) { |
| throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() + " because it is currently disabled"); |
| } |
| |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() + " because it is already running"); |
| } |
| |
| if (getValidationStatus() == ValidationStatus.INVALID) { |
| throw new IllegalStateException("Cannot start " + getReportingTask().getIdentifier() + " because it is in INVALID status"); |
| } |
| } |
| |
| @Override |
| public void verifyCanStop() { |
| if (!isRunning()) { |
| throw new IllegalStateException("Cannot stop " + getReportingTask().getIdentifier() + " because it is not running"); |
| } |
| } |
| |
| @Override |
| public void verifyCanUpdate() { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot update " + getReportingTask().getIdentifier() + " because it is currently running"); |
| } |
| } |
| |
| @Override |
| public void verifyCanClearState() { |
| verifyCanUpdate(); |
| } |
| |
| @Override |
| public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { |
| switch (getScheduledState()) { |
| case DISABLED: |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled"); |
| case RUNNING: |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running"); |
| case STOPPED: |
| break; |
| } |
| final int activeThreadCount = getActiveThreadCount(); |
| if (activeThreadCount > 0) { |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it has " + activeThreadCount + " active threads already"); |
| } |
| |
| final Collection<ValidationResult> validationResults = getValidationErrors(ignoredReferences); |
| if (!validationResults.isEmpty()) { |
| throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not currently valid"); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "ReportingTask[id=" + getIdentifier() + "]"; |
| } |
| |
| @Override |
| public String getProcessGroupIdentifier() { |
| return null; |
| } |
| |
| @Override |
| public ParameterLookup getParameterLookup() { |
| return ParameterLookup.EMPTY; |
| } |
| |
| @Override |
| public void verifyCanPerformVerification() { |
| if (isRunning()) { |
| throw new IllegalStateException("Cannot perform verification because Reporting Task is not fully stopped"); |
| } |
| } |
| |
| @Override |
| public List<ConfigVerificationResult> verifyConfiguration(final ConfigurationContext context, final ComponentLog logger, final ExtensionManager extensionManager) { |
| final List<ConfigVerificationResult> results = new ArrayList<>(); |
| |
| try { |
| verifyCanPerformVerification(); |
| |
| final long startNanos = System.nanoTime(); |
| // Call super's verifyConfig, which will perform component validation |
| results.addAll(super.verifyConfig(context.getProperties(), context.getAnnotationData(), null)); |
| final long validationComplete = System.nanoTime(); |
| |
| // If any invalid outcomes from validation, we do not want to perform additional verification, because we only run additional verification when the component is valid. |
| // This is done in order to make it much simpler to develop these verifications, since the developer doesn't have to worry about whether or not the given values are valid. |
| if (!results.isEmpty() && results.stream().anyMatch(result -> result.getOutcome() == Outcome.FAILED)) { |
| return results; |
| } |
| |
| final ReportingTask reportingTask = getReportingTask(); |
| if (reportingTask instanceof VerifiableReportingTask) { |
| logger.debug("{} is a VerifiableReportingTask. Will perform full verification of configuration.", this); |
| final VerifiableReportingTask verifiable = (VerifiableReportingTask) reportingTask; |
| |
| // Check if the given configuration requires a different classloader than the current configuration |
| final boolean classpathDifferent = isClasspathDifferent(context.getProperties()); |
| |
| if (classpathDifferent) { |
| // Create a classloader for the given configuration and use that to verify the component's configuration |
| final Bundle bundle = extensionManager.getBundle(getBundleCoordinate()); |
| final Set<URL> classpathUrls = getAdditionalClasspathResources(context.getProperties().keySet(), descriptor -> context.getProperty(descriptor).getValue()); |
| |
| final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); |
| try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false)) { |
| Thread.currentThread().setContextClassLoader(detectedClassLoader); |
| results.addAll(verifiable.verify(context, logger)); |
| } finally { |
| Thread.currentThread().setContextClassLoader(currentClassLoader); |
| } |
| } else { |
| // Verify the configuration, using the component's classloader |
| try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, reportingTask.getClass(), getIdentifier())) { |
| results.addAll(verifiable.verify(context, logger)); |
| } |
| } |
| |
| final long validationNanos = validationComplete - startNanos; |
| final long verificationNanos = System.nanoTime() - validationComplete; |
| logger.debug("{} completed full configuration validation in {} plus {} for validation", |
| this, FormatUtils.formatNanos(verificationNanos, false), FormatUtils.formatNanos(validationNanos, false)); |
| } else { |
| logger.debug("{} is not a VerifiableReportingTask, so will not perform full verification of configuration. Validation took {}", this, |
| FormatUtils.formatNanos(validationComplete - startNanos, false)); |
| } |
| } catch (final Throwable t) { |
| logger.error("Failed to perform verification of Reporting Task's configuration for {}", this, t); |
| |
| results.add(new ConfigVerificationResult.Builder() |
| .outcome(Outcome.FAILED) |
| .verificationStepName("Perform Verification") |
| .explanation("Encountered unexpected failure when attempting to perform verification: " + t) |
| .build()); |
| } |
| |
| return results; |
| } |
| } |