blob: 277609c241389e626d1b45993e66502335cfa5c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.reporting.LogComponentStatuses;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.extensions.ExtensionRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stateless.config.ConfigurableExtensionDefinition;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterDefinition;
import org.apache.nifi.stateless.config.ParameterProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.parameter.CompositeParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterProviderInitializationContext;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
private static final int CONCURRENT_EXTENSION_DOWNLOADS = 4;
// Member Variables injected via Builder
private final ExtensionManager extensionManager;
private final BulletinRepository bulletinRepository;
private final StatelessStateManagerProvider stateManagerProvider;
private final PropertyEncryptor propertyEncryptor;
private final FlowRegistryClient flowRegistryClient;
private final VariableRegistry rootVariableRegistry;
private final ProcessScheduler processScheduler;
private final KerberosConfig kerberosConfig;
private final FlowFileEventRepository flowFileEventRepository;
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
private final ValidationTrigger validationTrigger;
// Member Variables injected via initialization. Effectively final.
private FlowManager flowManager;
private ControllerServiceProvider controllerServiceProvider;
private ProcessContextFactory processContextFactory;
private RepositoryContextFactory repositoryContextFactory;
private boolean initialized = false;
private StandardStatelessEngine(final Builder builder) {
this.extensionManager = requireNonNull(builder.extensionManager, "Extension Manager must be provided");
this.bulletinRepository = requireNonNull(builder.bulletinRepository, "Bulletin Repository must be provided");
this.stateManagerProvider = requireNonNull(builder.stateManagerProvider, "State Manager Provider must be provided");
this.propertyEncryptor = requireNonNull(builder.propertyEncryptor, "Encryptor must be provided");
this.flowRegistryClient = requireNonNull(builder.flowRegistryClient, "Flow Registry Client must be provided");
this.rootVariableRegistry = requireNonNull(builder.variableRegistry, "Variable Registry must be provided");
this.processScheduler = requireNonNull(builder.processScheduler, "Process Scheduler must be provided");
this.kerberosConfig = requireNonNull(builder.kerberosConfig, "Kerberos Configuration must be provided");
this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
}
@Override
public void initialize(final StatelessEngineInitializationContext initContext) {
this.flowManager = initContext.getFlowManager();
this.controllerServiceProvider = initContext.getControllerServiceProvider();
this.processContextFactory = initContext.getProcessContextFactory();
this.repositoryContextFactory = initContext.getRepositoryContextFactory();
this.initialized = true;
}
@Override
public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
if (!this.initialized) {
throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
}
final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow();
logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName());
loadNecessaryExtensions(dataflowDefinition);
extensionManager.logClassLoaderDetails();
// Create a child group and add it to the root group. We do this, rather than interacting with the root group directly
// because the flow may well have Local Input/Output ports, and those are not allowed on the Root Group.
final ProcessGroup rootGroup = flowManager.getRootGroup();
final ProcessGroup childGroup = flowManager.createProcessGroup("stateless-flow");
childGroup.setName("Stateless Flow");
rootGroup.addProcessGroup(childGroup);
childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
final ParameterProvider parameterProvider = createParameterProvider(dataflowDefinition);
// Map existing parameter contexts by name
final Set<ParameterContext> parameterContexts = flowManager.getParameterContextManager().getParameterContexts();
final Map<String, ParameterContext> parameterContextMap = parameterContexts.stream()
.collect(Collectors.toMap(ParameterContext::getName, context -> context));
// Update Parameters to match those that are provided in the flow configuration, plus those overrides provided
final List<ParameterContextDefinition> parameterContextDefinitions = dataflowDefinition.getParameterContexts();
if (parameterContextDefinitions != null) {
parameterContextDefinitions.forEach(contextDefinition -> registerParameterContext(contextDefinition, parameterContextMap));
}
overrideParameters(parameterContextMap, parameterProvider);
final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler);
final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
return dataflow;
}
private ParameterProvider createParameterProvider(final DataflowDefinition<?> dataflowDefinition) {
// Create a Provider for each definition
final List<ParameterProvider> providers = new ArrayList<>();
for (final ParameterProviderDefinition definition : dataflowDefinition.getParameterProviderDefinitions()) {
providers.add(createParameterProvider(definition));
}
// Create a Composite Parameter Provider that wraps all of the others.
final CompositeParameterProvider provider = new CompositeParameterProvider(providers);
final ParameterProviderInitializationContext initializationContext = new StandardParameterProviderInitializationContext(provider, Collections.emptyMap(), UUID.randomUUID().toString());
provider.initialize(initializationContext);
return provider;
}
private ParameterProvider createParameterProvider(final ParameterProviderDefinition definition) {
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(definition, "Parameter Provider");
final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
if (bundle == null) {
throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
final String providerType = definition.getType();
final String providerId = UUID.randomUUID().toString();
final InstanceClassLoader classLoader = extensionManager.createInstanceClassLoader(providerType, providerId, bundle, Collections.emptySet());
try {
final Class<?> rawClass = Class.forName(providerType, true, classLoader);
Thread.currentThread().setContextClassLoader(classLoader);
final ParameterProvider parameterProvider = (ParameterProvider) rawClass.newInstance();
// Initialize the provider
final Map<String, String> properties = resolveProperties(definition.getPropertyValues(), parameterProvider, parameterProvider.getPropertyDescriptors());
final ParameterProviderInitializationContext initializationContext = new StandardParameterProviderInitializationContext(parameterProvider, properties, providerId);
parameterProvider.initialize(initializationContext);
// Ensure that the Parameter Provider is valid.
final List<ValidationResult> validationResults = validate(parameterProvider, properties, providerId);
if (!validationResults.isEmpty()) {
throw new IllegalStateException("Parameter Provider with name <" + definition.getName() + "> is not valid: " + validationResults);
}
return parameterProvider;
} catch (final Exception e) {
throw new IllegalStateException("Could not create Parameter Provider " + definition.getName() + " of type " + definition.getType(), e);
}
}
private List<ValidationResult> validate(final ConfigurableComponent component, final Map<String, String> properties, final String componentId) {
final Map<PropertyDescriptor, PropertyConfiguration> explicitlyConfiguredPropertyMap = new HashMap<>();
for (final Map.Entry<String, String> property : properties.entrySet()) {
final String propertyName = property.getKey();
final String propertyValue = property.getValue();
final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
final PropertyConfiguration propertyConfiguration = new PropertyConfiguration(propertyValue, null, Collections.emptyList());
explicitlyConfiguredPropertyMap.put(descriptor, propertyConfiguration);
}
final Map<PropertyDescriptor, PropertyConfiguration> fullPropertyMap = buildConfiguredAndDefaultPropertyMap(component, explicitlyConfiguredPropertyMap);
final ValidationContext validationContext = new StandardValidationContext(controllerServiceProvider, fullPropertyMap,
null, null, componentId, VariableRegistry.EMPTY_REGISTRY, null);
final Collection<ValidationResult> validationResults = component.validate(validationContext);
return validationResults.stream()
.filter(validationResult -> !validationResult.isValid())
.collect(Collectors.toList());
}
public Map<PropertyDescriptor, PropertyConfiguration> buildConfiguredAndDefaultPropertyMap(final ConfigurableComponent component, final Map<PropertyDescriptor, PropertyConfiguration> properties) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, component.getClass(), component.getIdentifier())) {
final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
if (supported == null || supported.isEmpty()) {
return Collections.unmodifiableMap(properties);
} else {
final Map<PropertyDescriptor, PropertyConfiguration> props = new LinkedHashMap<>();
for (final PropertyDescriptor descriptor : supported) {
props.put(descriptor, null);
}
props.putAll(properties);
return props;
}
}
}
private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
final BundleCoordinate coordinate = parseBundleCoordinate(reportingTaskDefinition);
if (coordinate == null) {
continue;
}
requiredBundles.add(coordinate);
}
for (final ParameterProviderDefinition parameterProviderDefinition : dataflowDefinition.getParameterProviderDefinitions()) {
final BundleCoordinate coordinate = parseBundleCoordinate(parameterProviderDefinition);
if (coordinate == null) {
continue;
}
requiredBundles.add(coordinate);
}
final ExecutorService executor = new FlowEngine(CONCURRENT_EXTENSION_DOWNLOADS, "Download Extensions", true);
final Future<Set<Bundle>> future = extensionRepository.fetch(requiredBundles, executor, CONCURRENT_EXTENSION_DOWNLOADS);
executor.shutdown();
logger.info("Waiting for bundles to complete download...");
final long downloadStart = System.currentTimeMillis();
final Set<Bundle> downloadedBundles;
try {
downloadedBundles = future.get();
} catch (Exception e) {
logger.error("Failed to obtain all necessary extension bundles", e);
throw new RuntimeException(e);
}
final long downloadMillis = System.currentTimeMillis() - downloadStart;
logger.info("Successfully downloaded {} bundles in {} millis", downloadedBundles.size(), downloadMillis);
}
private Set<BundleCoordinate> gatherRequiredBundles(final VersionedProcessGroup group) {
final Set<BundleCoordinate> requiredBundles = new HashSet<>();
gatherRequiredBundles(group, requiredBundles);
return requiredBundles;
}
private void gatherRequiredBundles(final VersionedProcessGroup group, final Set<BundleCoordinate> requiredBundles) {
group.getControllerServices().forEach(cs -> requiredBundles.add(toBundleCoordinate(cs.getBundle())));
group.getProcessors().forEach(processor -> requiredBundles.add(toBundleCoordinate(processor.getBundle())));
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
gatherRequiredBundles(childGroup, requiredBundles);
}
}
private BundleCoordinate toBundleCoordinate(final org.apache.nifi.registry.flow.Bundle bundle) {
return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}
private List<ReportingTaskNode> createReportingTasks(final DataflowDefinition<?> dataflowDefinition) {
final List<ReportingTaskNode> reportingTaskNodes = new ArrayList<>();
for (final ReportingTaskDefinition taskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
final ReportingTaskNode taskNode = createReportingTask(taskDefinition);
reportingTaskNodes.add(taskNode);
}
return reportingTaskNodes;
}
private ReportingTaskNode createReportingTask(final ReportingTaskDefinition taskDefinition) {
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(taskDefinition, "Reporting Task");
final ReportingTaskNode taskNode = flowManager.createReportingTask(taskDefinition.getType(), UUID.randomUUID().toString(), bundleCoordinate, Collections.emptySet(), true, true);
final Map<String, String> properties = resolveProperties(taskDefinition.getPropertyValues(), taskNode.getComponent(), taskNode.getProperties().keySet());
taskNode.setProperties(properties);
taskNode.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
taskNode.setSchedulingPeriod(taskDefinition.getSchedulingFrequency());
// Ensure that the Parameter Provider is valid.
final List<ValidationResult> validationResults = validate(taskNode.getComponent(), properties, taskNode.getIdentifier());
if (!validationResults.isEmpty()) {
throw new IllegalStateException("Reporting Task with name <" + taskNode.getName() + "> is not valid: " + validationResults);
}
return taskNode;
}
private Map<String, String> resolveProperties(final Map<String, String> configured, final ConfigurableComponent component, final Collection<PropertyDescriptor> componentDescriptors) {
// Map property display name to actual names.
final Map<String, String> displayNameToActualName = new HashMap<>();
for (final PropertyDescriptor descriptor : componentDescriptors) {
displayNameToActualName.put(descriptor.getDisplayName(), descriptor.getName());
}
// Map friendly property name in the task definition to the actual property name that we need in the 'resolved' map
final Map<String, String> resolved = new HashMap<>();
for (final Map.Entry<String, String> entry : configured.entrySet()) {
final String configuredName = entry.getKey();
final String configuredValue = entry.getValue();
final String actual = displayNameToActualName.get(configuredName);
final String resolvedName = actual == null ? configuredName : actual;
// If property has allowable values, resolve the user-friendly name to the name that the Reporting Task needs.
String resolvedValue = configuredValue;
if (actual != null) {
// This is a 'known' / non-dynamic property
final PropertyDescriptor descriptor = component.getPropertyDescriptor(actual);
final List<AllowableValue> allowableValues = descriptor.getAllowableValues();
if (allowableValues != null && !allowableValues.isEmpty()) {
for (final AllowableValue allowableValue : allowableValues) {
if (allowableValue.getDisplayName().equalsIgnoreCase(configuredValue)) {
resolvedValue = allowableValue.getValue();
logger.debug("Resolving property value of {} for {} of {} to {}", configuredValue, configuredName, component, resolvedValue);
break;
}
}
}
}
resolved.put(resolvedName, resolvedValue);
}
// Return map containing the desired names.
return resolved;
}
private BundleCoordinate determineBundleCoordinate(final ConfigurableExtensionDefinition extensionDefinition, final String extensionType) {
final String explicitCoordinates = extensionDefinition.getBundleCoordinates();
if (explicitCoordinates != null && !explicitCoordinates.trim().isEmpty()) {
final String resolvedClassName = resolveExtensionClassName(extensionDefinition, extensionType);
extensionDefinition.setType(resolvedClassName);
final BundleCoordinate coordinate = parseBundleCoordinate(extensionDefinition);
return coordinate;
}
final String specifiedType = extensionDefinition.getType();
String resolvedClassName = specifiedType;
if (!specifiedType.contains(".")) {
final List<Bundle> possibleBundles = extensionManager.getBundles(extensionDefinition.getType());
if (possibleBundles.isEmpty()) {
logger.debug("Could not find extension type of <{}>. Will try to find matching Reporting Task type based on class name", specifiedType);
resolvedClassName = resolveExtensionClassName(extensionDefinition, extensionType);
extensionDefinition.setType(resolvedClassName);
logger.info("Resolved extension class {} to {}", specifiedType, resolvedClassName);
}
}
final List<Bundle> possibleBundles = extensionManager.getBundles(resolvedClassName);
if (possibleBundles.isEmpty()) {
throw new IllegalArgumentException("Extension '" + extensionDefinition.getName() + "' (" + extensionDefinition.getType() +
") does not specify a Bundle and no Bundles could be found for type " + extensionDefinition.getType());
}
if (possibleBundles.size() > 1) {
throw new IllegalArgumentException("Extension '" + extensionDefinition.getName() + "' (" + extensionDefinition.getType() +
") does not specify a Bundle and multiple Bundles exist for this type. The extension must specify a bundle to use.");
}
final Bundle bundle = possibleBundles.get(0);
final BundleCoordinate coordinate = bundle.getBundleDetails().getCoordinate();
return coordinate;
}
private BundleCoordinate parseBundleCoordinate(final ConfigurableExtensionDefinition extensionDefinition) {
final String specifiedCoordinates = extensionDefinition.getBundleCoordinates();
if (specifiedCoordinates == null) {
return null;
}
final String[] splits = specifiedCoordinates.split(":", 3);
if (splits.length != 3) {
throw new IllegalArgumentException("Reporting Task '" + extensionDefinition.getName() + "' (" + extensionDefinition.getType() + ") specifies bundle as '" +
specifiedCoordinates + "', but this " + "is not a valid Bundle format. Format should be <group>:<id>:<version>");
}
return new BundleCoordinate(splits[0], splits[1], splits[2]);
}
private String resolveExtensionClassName(final ConfigurableExtensionDefinition extensionDefinition, final String extensionType) {
final String specifiedType = extensionDefinition.getType();
if (specifiedType.contains(".")) {
return specifiedType;
}
final Set<String> possibleResolvedClassNames = new HashSet<>();
final Set<ExtensionDefinition> definitions = extensionManager.getExtensions(ReportingTask.class);
for (final ExtensionDefinition definition : definitions) {
final String implementationClassName = definition.getImplementationClassName();
final String simpleName = implementationClassName.contains(".") ? StringUtils.substringAfterLast(implementationClassName, ".") : implementationClassName;
if (simpleName.equals(specifiedType)) {
logger.debug("Found possible matching class {}", implementationClassName);
possibleResolvedClassNames.add(implementationClassName);
}
}
if (possibleResolvedClassNames.isEmpty()) {
throw new IllegalArgumentException(String.format("%s '%s' (%s) does not specify a Bundle, and no %s implementations exist with a class name of %s.",
extensionType, extensionDefinition.getName(), extensionDefinition.getType(), extensionType, extensionDefinition.getType()));
}
if (possibleResolvedClassNames.size() > 1) {
throw new IllegalArgumentException(String.format("%s '%s' (%s) does not specify a Bundle, and no %s implementations exist with a class name of %s. Perhaps you meant one of: %s",
extensionType, extensionDefinition.getName(), extensionDefinition.getType(), extensionType, extensionDefinition.getType(), possibleResolvedClassNames));
}
return possibleResolvedClassNames.iterator().next();
}
private void overrideParameters(final Map<String, ParameterContext> parameterContextMap, final ParameterProvider parameterProvider) {
for (final ParameterContext context : parameterContextMap.values()) {
final String contextName = context.getName();
final Map<ParameterDescriptor, Parameter> parameters = context.getParameters();
final Map<String, Parameter> updatedParameters = new HashMap<>();
for (final Parameter parameter : parameters.values()) {
final String parameterName = parameter.getDescriptor().getName();
if (parameterProvider.isParameterDefined(contextName, parameterName)) {
final String providedValue = parameterProvider.getParameterValue(contextName, parameterName);
final Parameter updatedParameter = new Parameter(parameter.getDescriptor(), providedValue);
updatedParameters.put(parameterName, updatedParameter);
}
}
context.setParameters(updatedParameters);
}
}
private void registerParameterContext(final ParameterContextDefinition parameterContextDefinition, final Map<String, ParameterContext> parameterContextMap) {
final String contextName = parameterContextDefinition.getName();
final ParameterContext existingContext = parameterContextMap.get(contextName);
if (existingContext == null) {
logger.warn("Configuration contains a Parameter Context with name <" + contextName + "> but the flow does not contain any Parameter Context with this name. " +
"These Parameters will be ignored.");
return;
}
final Map<String, Parameter> parameters = new HashMap<>();
final List<ParameterDefinition> parameterDefinitions = parameterContextDefinition.getParameters();
if (parameterDefinitions != null) {
for (final ParameterDefinition parameterDefinition : parameterDefinitions) {
final String parameterName = parameterDefinition.getName();
final Optional<Parameter> optionalParameter = existingContext.getParameter(parameterName);
if (!optionalParameter.isPresent()) {
logger.warn("Configuration contains a Parameter with name <{}> for Parameter Context <{}> but the Parameter Context does not have a Parameter with that name. This Parameter will" +
" be ignored.", parameterName, contextName);
continue;
}
final Parameter existingParameter = optionalParameter.get();
final Parameter updatedParameter = new Parameter(existingParameter.getDescriptor(), parameterDefinition.getValue());
parameters.put(parameterName, updatedParameter);
}
}
existingContext.setParameters(parameters);
logger.info("Registered Parameter Context {}", parameterContextDefinition.getName());
}
@Override
public ExtensionManager getExtensionManager() {
return extensionManager;
}
@Override
public BulletinRepository getBulletinRepository() {
return bulletinRepository;
}
@Override
public StateManagerProvider getStateManagerProvider() {
return stateManagerProvider;
}
@Override
public PropertyEncryptor getPropertyEncryptor() {
return propertyEncryptor;
}
@Override
public FlowRegistryClient getFlowRegistryClient() {
return flowRegistryClient;
}
@Override
public VariableRegistry getRootVariableRegistry() {
return rootVariableRegistry;
}
@Override
public ProcessScheduler getProcessScheduler() {
return processScheduler;
}
@Override
public ReloadComponent getReloadComponent() {
return reloadComponent;
}
@Override
public ControllerServiceProvider getControllerServiceProvider() {
return controllerServiceProvider;
}
@Override
public ProvenanceRepository getProvenanceRepository() {
return provenanceRepository;
}
@Override
public FlowFileEventRepository getFlowFileEventRepository() {
return flowFileEventRepository;
}
@Override
public KerberosConfig getKerberosConfig() {
return kerberosConfig;
}
@Override
public ValidationTrigger getValidationTrigger() {
return validationTrigger;
}
@Override
public FlowManager getFlowManager() {
return flowManager;
}
@Override
public CounterRepository getCounterRepository() {
return counterRepository;
}
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
private StatelessStateManagerProvider stateManagerProvider = null;
private PropertyEncryptor propertyEncryptor = null;
private FlowRegistryClient flowRegistryClient = null;
private VariableRegistry variableRegistry = null;
private ProcessScheduler processScheduler = null;
private KerberosConfig kerberosConfig = null;
private FlowFileEventRepository flowFileEventRepository = null;
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
private CounterRepository counterRepository = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
return this;
}
public Builder bulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
return this;
}
public Builder stateManagerProvider(final StatelessStateManagerProvider stateManagerProvider) {
this.stateManagerProvider = stateManagerProvider;
return this;
}
public Builder encryptor(final PropertyEncryptor propertyEncryptor) {
this.propertyEncryptor = propertyEncryptor;
return this;
}
public Builder flowRegistryClient(final FlowRegistryClient flowRegistryClient) {
this.flowRegistryClient = flowRegistryClient;
return this;
}
public Builder variableRegistry(final VariableRegistry variableRegistry) {
this.variableRegistry = variableRegistry;
return this;
}
public Builder processScheduler(final ProcessScheduler processScheduler) {
this.processScheduler = processScheduler;
return this;
}
public Builder kerberosConfiguration(final KerberosConfig kerberosConfig) {
this.kerberosConfig = kerberosConfig;
return this;
}
public Builder flowFileEventRepository(final FlowFileEventRepository flowFileEventRepository) {
this.flowFileEventRepository = flowFileEventRepository;
return this;
}
public Builder provenanceRepository(final ProvenanceRepository provenanceRepository) {
this.provenanceRepository = provenanceRepository;
return this;
}
public Builder extensionRepository(final ExtensionRepository extensionRepository) {
this.extensionRepository = extensionRepository;
return this;
}
public Builder counterRepository(final CounterRepository counterRepository) {
this.counterRepository = counterRepository;
return this;
}
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
}
}