blob: 8e2a024f9ac4ecaefee7f47c76ec158dc22e23ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.reporting.LogComponentStatuses;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.extensions.ExtensionRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterDefinition;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
private static final int CONCURRENT_EXTENSION_DOWNLOADS = 4;
// Member Variables injected via Builder
private final ExtensionManager extensionManager;
private final BulletinRepository bulletinRepository;
private final StatelessStateManagerProvider stateManagerProvider;
private final PropertyEncryptor propertyEncryptor;
private final FlowRegistryClient flowRegistryClient;
private final VariableRegistry rootVariableRegistry;
private final ProcessScheduler processScheduler;
private final KerberosConfig kerberosConfig;
private final FlowFileEventRepository flowFileEventRepository;
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
private final ValidationTrigger validationTrigger;
// Member Variables injected via initialization. Effectively final.
private FlowManager flowManager;
private ControllerServiceProvider controllerServiceProvider;
private ProcessContextFactory processContextFactory;
private RepositoryContextFactory repositoryContextFactory;
private boolean initialized = false;
private StandardStatelessEngine(final Builder builder) {
this.extensionManager = requireNonNull(builder.extensionManager, "Extension Manager must be provided");
this.bulletinRepository = requireNonNull(builder.bulletinRepository, "Bulletin Repository must be provided");
this.stateManagerProvider = requireNonNull(builder.stateManagerProvider, "State Manager Provider must be provided");
this.propertyEncryptor = requireNonNull(builder.propertyEncryptor, "Encryptor must be provided");
this.flowRegistryClient = requireNonNull(builder.flowRegistryClient, "Flow Registry Client must be provided");
this.rootVariableRegistry = requireNonNull(builder.variableRegistry, "Variable Registry must be provided");
this.processScheduler = requireNonNull(builder.processScheduler, "Process Scheduler must be provided");
this.kerberosConfig = requireNonNull(builder.kerberosConfig, "Kerberos Configuration must be provided");
this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
}
@Override
public void initialize(final StatelessEngineInitializationContext initContext) {
this.flowManager = initContext.getFlowManager();
this.controllerServiceProvider = initContext.getControllerServiceProvider();
this.processContextFactory = initContext.getProcessContextFactory();
this.repositoryContextFactory = initContext.getRepositoryContextFactory();
this.initialized = true;
}
@Override
public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition, final ParameterProvider parameterProvider) {
if (!this.initialized) {
throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
}
final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow();
logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName());
loadNecessaryExtensions(dataflowDefinition);
extensionManager.logClassLoaderDetails();
// Create a child group and add it to the root group. We do this, rather than interacting with the root group directly
// because the flow may well have Local Input/Output ports, and those are not allowed on the Root Group.
final ProcessGroup rootGroup = flowManager.getRootGroup();
final ProcessGroup childGroup = flowManager.createProcessGroup("stateless-flow");
childGroup.setName("Stateless Flow");
rootGroup.addProcessGroup(childGroup);
childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
// Map existing parameter contexts by name
final Set<ParameterContext> parameterContexts = flowManager.getParameterContextManager().getParameterContexts();
final Map<String, ParameterContext> parameterContextMap = parameterContexts.stream()
.collect(Collectors.toMap(ParameterContext::getName, context -> context));
// Update Parameters to match those that are provided in the flow configuration, plus those overrides provided
final List<ParameterContextDefinition> parameterContextDefinitions = dataflowDefinition.getParameterContexts();
if (parameterContextDefinitions != null) {
parameterContextDefinitions.forEach(contextDefinition -> registerParameterContext(contextDefinition, parameterContextMap));
}
overrideParameters(parameterContextMap, parameterProvider);
final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler);
final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
return dataflow;
}
private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
final BundleCoordinate coordinate = parseBundleCoordinate(reportingTaskDefinition);
if (coordinate == null) {
continue;
}
requiredBundles.add(coordinate);
}
final ExecutorService executor = new FlowEngine(CONCURRENT_EXTENSION_DOWNLOADS, "Download Extensions", true);
final Future<Set<Bundle>> future = extensionRepository.fetch(requiredBundles, executor, CONCURRENT_EXTENSION_DOWNLOADS);
executor.shutdown();
logger.info("Waiting for bundles to complete download...");
final long downloadStart = System.currentTimeMillis();
final Set<Bundle> downloadedBundles;
try {
downloadedBundles = future.get();
} catch (Exception e) {
logger.error("Failed to obtain all necessary extension bundles", e);
throw new RuntimeException(e);
}
final long downloadMillis = System.currentTimeMillis() - downloadStart;
logger.info("Successfully downloaded {} bundles in {} millis", downloadedBundles.size(), downloadMillis);
}
private Set<BundleCoordinate> gatherRequiredBundles(final VersionedProcessGroup group) {
final Set<BundleCoordinate> requiredBundles = new HashSet<>();
gatherRequiredBundles(group, requiredBundles);
return requiredBundles;
}
private void gatherRequiredBundles(final VersionedProcessGroup group, final Set<BundleCoordinate> requiredBundles) {
group.getControllerServices().forEach(cs -> requiredBundles.add(toBundleCoordinate(cs.getBundle())));
group.getProcessors().forEach(processor -> requiredBundles.add(toBundleCoordinate(processor.getBundle())));
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
gatherRequiredBundles(childGroup, requiredBundles);
}
}
private BundleCoordinate toBundleCoordinate(final org.apache.nifi.registry.flow.Bundle bundle) {
return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}
private List<ReportingTaskNode> createReportingTasks(final DataflowDefinition<?> dataflowDefinition) {
final List<ReportingTaskNode> reportingTaskNodes = new ArrayList<>();
for (final ReportingTaskDefinition taskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
final ReportingTaskNode taskNode = createReportingTask(taskDefinition);
reportingTaskNodes.add(taskNode);
}
return reportingTaskNodes;
}
private ReportingTaskNode createReportingTask(final ReportingTaskDefinition taskDefinition) {
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(taskDefinition);
final ReportingTaskNode taskNode = flowManager.createReportingTask(taskDefinition.getType(), UUID.randomUUID().toString(), bundleCoordinate, Collections.emptySet(), true, true);
taskNode.setProperties(resolveProperties(taskDefinition.getPropertyValues(), taskNode));
taskNode.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
taskNode.setSchedulingPeriod(taskDefinition.getSchedulingFrequency());
return taskNode;
}
private Map<String, String> resolveProperties(final Map<String, String> configured, final ReportingTaskNode taskNode) {
// Map property display name to actual names.
final Map<String, String> displayNameToActualName = new HashMap<>();
for (final PropertyDescriptor descriptor : taskNode.getProperties().keySet()) {
displayNameToActualName.put(descriptor.getDisplayName(), descriptor.getName());
}
// Map friendly property name in the task definition to the actual property name that we need in the 'resolved' map
final Map<String, String> resolved = new HashMap<>();
for (final Map.Entry<String, String> entry : configured.entrySet()) {
final String configuredName = entry.getKey();
final String configuredValue = entry.getValue();
final String actual = displayNameToActualName.get(configuredName);
final String resolvedName = actual == null ? configuredName : actual;
// If property has allowable values, resolve the user-friendly name to the name that the Reporting Task needs.
String resolvedValue = configuredValue;
if (actual != null) {
// This is a 'known' / non-dynamic property
final PropertyDescriptor descriptor = taskNode.getPropertyDescriptor(actual);
final List<AllowableValue> allowableValues = descriptor.getAllowableValues();
if (allowableValues != null && !allowableValues.isEmpty()) {
for (final AllowableValue allowableValue : allowableValues) {
if (allowableValue.getDisplayName().equalsIgnoreCase(configuredValue)) {
resolvedValue = allowableValue.getValue();
logger.debug("Resolving property value of {} for {} of {} to {}", configuredValue, configuredName, taskNode, resolvedValue);
break;
}
}
}
}
resolved.put(resolvedName, resolvedValue);
}
// Return map containing the desired names.
return resolved;
}
private BundleCoordinate determineBundleCoordinate(final ReportingTaskDefinition taskDefinition) {
final String explicitCoordinates = taskDefinition.getBundleCoordinates();
if (explicitCoordinates != null && !explicitCoordinates.trim().isEmpty()) {
final String resolvedClassName = resolveReportingTaskClassName(taskDefinition);
taskDefinition.setType(resolvedClassName);
final BundleCoordinate coordinate = parseBundleCoordinate(taskDefinition);
return coordinate;
}
final String specifiedType = taskDefinition.getType();
String resolvedClassName = specifiedType;
if (!specifiedType.contains(".")) {
final List<Bundle> possibleBundles = extensionManager.getBundles(taskDefinition.getType());
if (possibleBundles.isEmpty()) {
logger.debug("Could not find Reporting Task type of <{}>. Will try to find matching Reporting Task type based on class name", specifiedType);
resolvedClassName = resolveReportingTaskClassName(taskDefinition);
taskDefinition.setType(resolvedClassName);
logger.info("Resolved Reporting Task class {} to {}", specifiedType, resolvedClassName);
}
}
final List<Bundle> possibleBundles = extensionManager.getBundles(resolvedClassName);
if (possibleBundles.isEmpty()) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() +
") does not specify a Bundle and no Bundles could be found for type " + taskDefinition.getType());
}
if (possibleBundles.size() > 1) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() +
") does not specify a Bundle and multiple Bundles exist for this type. The reporting task must specify a bundle to use.");
}
final Bundle bundle = possibleBundles.get(0);
final BundleCoordinate coordinate = bundle.getBundleDetails().getCoordinate();
return coordinate;
}
private BundleCoordinate parseBundleCoordinate(final ReportingTaskDefinition taskDefinition) {
final String specifiedCoordinates = taskDefinition.getBundleCoordinates();
if (specifiedCoordinates == null) {
return null;
}
final String[] splits = specifiedCoordinates.split(":", 3);
if (splits.length != 3) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() + ") specifies bundle as '" + specifiedCoordinates + "', but this " +
"is not a valid Bundle format. Format should be <group>:<id>:<version>");
}
return new BundleCoordinate(splits[0], splits[1], splits[2]);
}
private String resolveReportingTaskClassName(final ReportingTaskDefinition taskDefinition) {
final String specifiedType = taskDefinition.getType();
if (specifiedType.contains(".")) {
return specifiedType;
}
final Set<String> possibleResolvedClassNames = new HashSet<>();
final Set<ExtensionDefinition> definitions = extensionManager.getExtensions(ReportingTask.class);
for (final ExtensionDefinition definition : definitions) {
final String implementationClassName = definition.getImplementationClassName();
final String simpleName = implementationClassName.contains(".") ? StringUtils.substringAfterLast(implementationClassName, ".") : implementationClassName;
if (simpleName.equals(specifiedType)) {
logger.debug("Found possible matching class {}", implementationClassName);
possibleResolvedClassNames.add(implementationClassName);
}
}
if (possibleResolvedClassNames.isEmpty()) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() + ") does not specify a Bundle, and no Reporting Task" +
" implementations exist with a class name of " + taskDefinition.getType() + ".");
}
if (possibleResolvedClassNames.size() > 1) {
throw new IllegalArgumentException("Reporting Task '" + taskDefinition.getName() + "' (" + taskDefinition.getType() + ") does not specify a Bundle, and no Reporting Task" +
" implementations exist with a class name of " + taskDefinition.getType() + ". Perhaps you meant one of: " + possibleResolvedClassNames);
}
return possibleResolvedClassNames.iterator().next();
}
private void overrideParameters(final Map<String, ParameterContext> parameterContextMap, final ParameterProvider parameterProvider) {
for (final ParameterContext context : parameterContextMap.values()) {
final String contextName = context.getName();
final Map<ParameterDescriptor, Parameter> parameters = context.getParameters();
final Map<String, Parameter> updatedParameters = new HashMap<>();
for (final Parameter parameter : parameters.values()) {
final String parameterName = parameter.getDescriptor().getName();
if (parameterProvider.isParameterDefined(contextName, parameterName)) {
final String providedValue = parameterProvider.getParameterValue(contextName, parameterName);
final Parameter updatedParameter = new Parameter(parameter.getDescriptor(), providedValue);
updatedParameters.put(parameterName, updatedParameter);
}
}
context.setParameters(updatedParameters);
}
}
private void registerParameterContext(final ParameterContextDefinition parameterContextDefinition, final Map<String, ParameterContext> parameterContextMap) {
final String contextName = parameterContextDefinition.getName();
final ParameterContext existingContext = parameterContextMap.get(contextName);
if (existingContext == null) {
logger.warn("Configuration contains a Parameter Context with name <" + contextName + "> but the flow does not contain any Parameter Context with this name. " +
"These Parameters will be ignored.");
return;
}
final Map<String, Parameter> parameters = new HashMap<>();
final List<ParameterDefinition> parameterDefinitions = parameterContextDefinition.getParameters();
if (parameterDefinitions != null) {
for (final ParameterDefinition parameterDefinition : parameterDefinitions) {
final String parameterName = parameterDefinition.getName();
final Optional<Parameter> optionalParameter = existingContext.getParameter(parameterName);
if (!optionalParameter.isPresent()) {
logger.warn("Configuration contains a Parameter with name <{}> for Parameter Context <{}> but the Parameter Context does not have a Parameter with that name. This Parameter will" +
" be ignored.", parameterName, contextName);
continue;
}
final Parameter existingParameter = optionalParameter.get();
final Parameter updatedParameter = new Parameter(existingParameter.getDescriptor(), parameterDefinition.getValue());
parameters.put(parameterName, updatedParameter);
}
}
existingContext.setParameters(parameters);
logger.info("Registered Parameter Context {}", parameterContextDefinition.getName());
}
@Override
public ExtensionManager getExtensionManager() {
return extensionManager;
}
@Override
public BulletinRepository getBulletinRepository() {
return bulletinRepository;
}
@Override
public StateManagerProvider getStateManagerProvider() {
return stateManagerProvider;
}
@Override
public PropertyEncryptor getPropertyEncryptor() {
return propertyEncryptor;
}
@Override
public FlowRegistryClient getFlowRegistryClient() {
return flowRegistryClient;
}
@Override
public VariableRegistry getRootVariableRegistry() {
return rootVariableRegistry;
}
@Override
public ProcessScheduler getProcessScheduler() {
return processScheduler;
}
@Override
public ReloadComponent getReloadComponent() {
return reloadComponent;
}
@Override
public ControllerServiceProvider getControllerServiceProvider() {
return controllerServiceProvider;
}
@Override
public ProvenanceRepository getProvenanceRepository() {
return provenanceRepository;
}
@Override
public FlowFileEventRepository getFlowFileEventRepository() {
return flowFileEventRepository;
}
@Override
public KerberosConfig getKerberosConfig() {
return kerberosConfig;
}
@Override
public ValidationTrigger getValidationTrigger() {
return validationTrigger;
}
@Override
public FlowManager getFlowManager() {
return flowManager;
}
@Override
public CounterRepository getCounterRepository() {
return counterRepository;
}
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
private StatelessStateManagerProvider stateManagerProvider = null;
private PropertyEncryptor propertyEncryptor = null;
private FlowRegistryClient flowRegistryClient = null;
private VariableRegistry variableRegistry = null;
private ProcessScheduler processScheduler = null;
private KerberosConfig kerberosConfig = null;
private FlowFileEventRepository flowFileEventRepository = null;
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
private CounterRepository counterRepository = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
return this;
}
public Builder bulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
return this;
}
public Builder stateManagerProvider(final StatelessStateManagerProvider stateManagerProvider) {
this.stateManagerProvider = stateManagerProvider;
return this;
}
public Builder encryptor(final PropertyEncryptor propertyEncryptor) {
this.propertyEncryptor = propertyEncryptor;
return this;
}
public Builder flowRegistryClient(final FlowRegistryClient flowRegistryClient) {
this.flowRegistryClient = flowRegistryClient;
return this;
}
public Builder variableRegistry(final VariableRegistry variableRegistry) {
this.variableRegistry = variableRegistry;
return this;
}
public Builder processScheduler(final ProcessScheduler processScheduler) {
this.processScheduler = processScheduler;
return this;
}
public Builder kerberosConfiguration(final KerberosConfig kerberosConfig) {
this.kerberosConfig = kerberosConfig;
return this;
}
public Builder flowFileEventRepository(final FlowFileEventRepository flowFileEventRepository) {
this.flowFileEventRepository = flowFileEventRepository;
return this;
}
public Builder provenanceRepository(final ProvenanceRepository provenanceRepository) {
this.provenanceRepository = provenanceRepository;
return this;
}
public Builder extensionRepository(final ExtensionRepository extensionRepository) {
this.extensionRepository = extensionRepository;
return this;
}
public Builder counterRepository(final CounterRepository counterRepository) {
this.counterRepository = counterRepository;
return this;
}
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
}
}