blob: f61816cf382631b83ddc51572e702f15d1a3db50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.commons.lang3.ClassUtils;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.LoggableComponent;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.TerminationAwareLogger;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StatelessReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceInitializationContext;
import org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ComponentBuilder {
private static final Logger logger = LoggerFactory.getLogger(ComponentBuilder.class);
private StatelessEngine<VersionedFlowSnapshot> statelessEngine;
private FlowManager flowManager;
private String identifier;
private String type;
private BundleCoordinate bundleCoordinate;
private Set<URL> additionalClassPathUrls;
public ComponentBuilder statelessEngine(final StatelessEngine<VersionedFlowSnapshot> statelessEngine) {
this.statelessEngine = statelessEngine;
return this;
}
public ComponentBuilder identifier(final String identifier) {
this.identifier = identifier;
return this;
}
public ComponentBuilder type(final String type) {
this.type = type;
return this;
}
public ComponentBuilder bundleCoordinate(final BundleCoordinate bundleCoordinate) {
this.bundleCoordinate = bundleCoordinate;
return this;
}
public ComponentBuilder additionalClassPathUrls(final Set<URL> urls) {
if (urls == null || urls.isEmpty()) {
return this;
}
if (this.additionalClassPathUrls == null) {
this.additionalClassPathUrls = new HashSet<>();
}
this.additionalClassPathUrls.addAll(urls);
return this;
}
public ComponentBuilder flowManager(final FlowManager flowManager) {
this.flowManager = flowManager;
return this;
}
public ProcessorNode buildProcessor() throws ProcessorInstantiationException {
final LoggableComponent<Processor> loggableProcessor = createLoggableProcessor();
final ProcessScheduler processScheduler = statelessEngine.getProcessScheduler();
final ControllerServiceProvider controllerServiceProvider = statelessEngine.getControllerServiceProvider();
final ComponentVariableRegistry componentVariableRegistry = new StandardComponentVariableRegistry(statelessEngine.getRootVariableRegistry());
final ReloadComponent reloadComponent = statelessEngine.getReloadComponent();
final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
final ValidationTrigger validationTrigger = statelessEngine.getValidationTrigger();
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVariableRegistry);
final ProcessorNode procNode = new StandardProcessorNode(loggableProcessor, identifier, validationContextFactory, processScheduler, controllerServiceProvider,
componentVariableRegistry, reloadComponent, extensionManager, validationTrigger);
logger.info("Created Processor of type {} with identifier {}", type, identifier);
return procNode;
}
public ReportingTaskNode buildReportingTask() throws ReportingTaskInstantiationException {
final LoggableComponent<ReportingTask> reportingTaskComponent = createLoggableReportingTask();
final ProcessScheduler processScheduler = statelessEngine.getProcessScheduler();
final ControllerServiceProvider controllerServiceProvider = statelessEngine.getControllerServiceProvider();
final ComponentVariableRegistry componentVariableRegistry = new StandardComponentVariableRegistry(statelessEngine.getRootVariableRegistry());
final ReloadComponent reloadComponent = statelessEngine.getReloadComponent();
final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
final ValidationTrigger validationTrigger = statelessEngine.getValidationTrigger();
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVariableRegistry);
final ReportingTaskNode taskNode = new StatelessReportingTaskNode(reportingTaskComponent, identifier, statelessEngine, flowManager,
processScheduler, validationContextFactory, componentVariableRegistry, reloadComponent, extensionManager, validationTrigger);
logger.info("Created Reporting Task of type {} with identifier {}", type, identifier);
return taskNode;
}
private LoggableComponent<ReportingTask> createLoggableReportingTask() throws ReportingTaskInstantiationException {
try {
final LoggableComponent<ReportingTask> taskComponent = createLoggableComponent(ReportingTask.class);
final String taskName = taskComponent.getComponent().getClass().getSimpleName();
final NodeTypeProvider nodeTypeProvider = new StatelessNodeTypeProvider();
final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName,
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), statelessEngine.getControllerServiceProvider(),
statelessEngine.getKerberosConfig(), nodeTypeProvider);
taskComponent.getComponent().initialize(config);
return taskComponent;
} catch (final Exception e) {
throw new ReportingTaskInstantiationException(type, e);
}
}
public ControllerServiceNode buildControllerService() {
final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
final StateManagerProvider stateManagerProvider = statelessEngine.getStateManagerProvider();
final ControllerServiceProvider serviceProvider = statelessEngine.getControllerServiceProvider();
final KerberosConfig kerberosConfig = statelessEngine.getKerberosConfig();
final VariableRegistry rootVariableRegistry = statelessEngine.getRootVariableRegistry();
final ReloadComponent reloadComponent = statelessEngine.getReloadComponent();
final ValidationTrigger validationTrigger = statelessEngine.getValidationTrigger();
final NodeTypeProvider nodeTypeProvider = new StatelessNodeTypeProvider();
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
Bundle bundle = extensionManager.getBundle(bundleCoordinate);
if (bundle == null) {
final List<Bundle> possibleBundles = extensionManager.getBundles(type);
if (possibleBundles.size() == 1) {
bundle = possibleBundles.get(0);
logger.warn("Flow specifies bundle coordinates of {} for Controller Service of type {} but could not find that Bundle. Will use {} instead", bundleCoordinate, type, bundle);
} else {
throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
}
final Set<URL> classpathUrls = additionalClassPathUrls == null ? Collections.emptySet() : additionalClassPathUrls;
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
final ControllerService serviceImpl = controllerServiceClass.newInstance();
final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(extensionManager, serviceImpl);
// extract all interfaces... controllerServiceClass is non null so getAllInterfaces is non null
final List<Class<?>> interfaceList = ClassUtils.getAllInterfaces(controllerServiceClass);
final Class<?>[] interfaces = interfaceList.toArray(new Class<?>[0]);
final ControllerService proxiedService;
if (detectedClassLoader == null) {
proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, invocationHandler);
} else {
proxiedService = (ControllerService) Proxy.newProxyInstance(detectedClassLoader, interfaces, invocationHandler);
}
logger.info("Created Controller Service of type {} with identifier {}", type, identifier);
final ComponentLog serviceLogger = new SimpleProcessLogger(identifier, serviceImpl);
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(serviceLogger);
final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger,
serviceProvider, stateManager, kerberosConfig, nodeTypeProvider);
serviceImpl.initialize(initContext);
final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger);
final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, terminationAwareLogger);
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(rootVariableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
serviceNode.setName(rawClass.getSimpleName());
invocationHandler.setServiceNode(serviceNode);
return serviceNode;
} catch (final Exception e) {
throw new ControllerServiceInstantiationException("Failed to create Controller Service of type " + type, e);
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
}
}
}
private LoggableComponent<Processor> createLoggableProcessor() throws ProcessorInstantiationException {
try {
final LoggableComponent<Processor> processorComponent = createLoggableComponent(Processor.class);
final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
statelessEngine.getControllerServiceProvider(), new StatelessNodeTypeProvider(), statelessEngine.getKerberosConfig());
processorComponent.getComponent().initialize(initiContext);
return processorComponent;
} catch (final Exception e) {
throw new ProcessorInstantiationException(type, e);
}
}
private <T extends ConfigurableComponent> LoggableComponent<T> createLoggableComponent(Class<T> nodeType) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ExtensionManager extensionManager = statelessEngine.getExtensionManager();
Bundle bundle = extensionManager.getBundle(bundleCoordinate);
if (bundle == null) {
final List<Bundle> possibleBundles = extensionManager.getBundles(type);
if (possibleBundles.size() == 1) {
bundle = possibleBundles.get(0);
logger.warn("Flow specifies bundle coordinates of {} for {} of type {} but could not find that Bundle. Will use {} instead",
bundleCoordinate, nodeType.getSimpleName(), type, bundle);
} else {
throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
}
final Set<URL> classpathUrls = additionalClassPathUrls == null ? Collections.emptySet() : additionalClassPathUrls;
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);
final Object extensionInstance = rawClass.newInstance();
final ComponentLog componentLog = new SimpleProcessLogger(identifier, extensionInstance);
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);
final T cast = nodeType.cast(extensionInstance);
return new LoggableComponent<>(cast, bundleCoordinate, terminationAwareLogger);
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
}
}
}
}