blob: 38b289bb419577b2fcd58bc6026cee034241932a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.integration;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FileSystemSwapManager;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardFlowSynchronizer;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FileSystemRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.controller.status.history.VolatileComponentStatusRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.processor.BiConsumerProcessor;
import org.apache.nifi.integration.processors.GenerateProcessor;
import org.apache.nifi.integration.processors.NopProcessor;
import org.apache.nifi.integration.processors.TerminateAll;
import org.apache.nifi.integration.processors.TerminateOnce;
import org.apache.nifi.integration.processors.UsernamePasswordProcessor;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.persistence.FlowConfigurationDAO;
import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.WriteAheadProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class FrameworkIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(FrameworkIntegrationTest.class);
//@Rule
public Timeout globalTimeout = Timeout.seconds(20);
@Rule
public TestName name = new TestName();
private ResourceClaimManager resourceClaimManager;
private StandardProcessScheduler processScheduler;
private FlowEngine flowEngine;
private FlowController flowController;
private FlowRegistryClient flowRegistryClient = createFlowRegistryClient();
private ProcessorNode nopProcessor;
private ProcessorNode terminateProcessor;
private ProcessorNode terminateAllProcessor;
private FlowFileSwapManager flowFileSwapManager;
private DirectInjectionExtensionManager extensionManager;
private ProcessGroup rootProcessGroup;
private Bundle systemBundle;
private ClusterCoordinator clusterCoordinator;
private NiFiProperties nifiProperties;
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
@Before
public void setup() throws IOException {
StandardStateManagerProvider.resetProvider();
cleanup();
initialize();
flowController.initializeFlow();
createFlow();
}
protected String getNiFiPropertiesFilename() {
if (isClusteredTest()) {
return "src/test/resources/int-tests/clustered-nifi.properties";
} else {
return "src/test/resources/int-tests/default-nifi.properties";
}
}
protected Map<String, String> getNiFiPropertiesOverrides() {
return Collections.emptyMap();
}
protected void injectExtensionTypes(final DirectInjectionExtensionManager extensionManager) {
// Placeholder for subclasses.
}
protected final void initialize() throws IOException {
final Map<String, String> propertyOverrides = new HashMap<>(getNiFiPropertiesOverrides());
if (isClusteredTest()) {
propertyOverrides.put(NiFiProperties.CLUSTER_IS_NODE, "true");
}
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(getNiFiPropertiesFilename(), propertyOverrides);
initialize(nifiProperties);
}
/**
* This method exists for subclasses to override and return a different implementation.
*/
protected FlowRegistryClient createFlowRegistryClient() {
return new StandardFlowRegistryClient();
}
protected final void initialize(final NiFiProperties nifiProperties) throws IOException {
this.nifiProperties = nifiProperties;
final FlowFileEventRepository flowFileEventRepository = new RingBufferEventRepository(5);
final BulletinRepository bulletinRepo = new VolatileBulletinRepository();
flowEngine = new FlowEngine(4, "unit test flow engine");
extensionManager = new DirectInjectionExtensionManager();
extensionManager.injectExtensionType(FlowFileRepository.class, WriteAheadFlowFileRepository.class);
extensionManager.injectExtensionType(ContentRepository.class, FileSystemRepository.class);
extensionManager.injectExtensionType(ProvenanceRepository.class, WriteAheadProvenanceRepository.class);
extensionManager.injectExtensionType(StateProvider.class, WriteAheadLocalStateProvider.class);
extensionManager.injectExtensionType(StatusHistoryRepository.class, VolatileComponentStatusRepository.class);
extensionManager.injectExtensionType(FlowFileSwapManager.class, FileSystemSwapManager.class);
extensionManager.injectExtensionType(Processor.class, BiConsumerProcessor.class);
extensionManager.injectExtensionType(Processor.class, GenerateProcessor.class);
extensionManager.injectExtensionType(Processor.class, TerminateOnce.class);
extensionManager.injectExtensionType(Processor.class, TerminateAll.class);
extensionManager.injectExtensionType(Processor.class, NopProcessor.class);
extensionManager.injectExtensionType(Processor.class, UsernamePasswordProcessor.class);
injectExtensionTypes(extensionManager);
systemBundle = SystemBundle.create(nifiProperties);
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
final PropertyEncryptor encryptor = createEncryptor();
final Authorizer authorizer = new AlwaysAuthorizedAuthorizer();
final AuditService auditService = new NopAuditService();
if (isClusteredTest()) {
final File zookeeperDir = new File("target/state/zookeeper");
final File version2Dir = new File(zookeeperDir, "version-2");
if (!version2Dir.exists()) {
assertTrue(version2Dir.mkdirs());
}
final File[] children = version2Dir.listFiles();
if (children != null) {
for (final File file : children) {
FileUtils.deleteFile(file, true);
}
}
clusterCoordinator = Mockito.mock(ClusterCoordinator.class);
final HeartbeatMonitor heartbeatMonitor = Mockito.mock(HeartbeatMonitor.class);
final NodeProtocolSender protocolSender = Mockito.mock(NodeProtocolSender.class);
final LeaderElectionManager leaderElectionManager = new CuratorLeaderElectionManager(2, nifiProperties);
final NodeIdentifier localNodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 8111, "localhost", 8081,
"localhost", 8082, "localhost", 8083, 8084, false, Collections.emptySet());
final NodeIdentifier node2Id = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 8222, "localhost", 8081,
"localhost", 8082, "localhost", 8083, 8084, false, Collections.emptySet());
final Set<NodeIdentifier> nodeIdentifiers = new HashSet<>();
nodeIdentifiers.add(localNodeId);
nodeIdentifiers.add(node2Id);
Mockito.when(clusterCoordinator.getNodeIdentifiers()).thenReturn(nodeIdentifiers);
Mockito.when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeId);
flowController = FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, protocolSender, bulletinRepo, clusterCoordinator,
heartbeatMonitor, leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager, Mockito.mock(RevisionManager.class));
flowController.setClustered(true, UUID.randomUUID().toString());
flowController.setNodeId(localNodeId);
flowController.setConnectionStatus(new NodeConnectionStatus(localNodeId, NodeConnectionState.CONNECTED));
} else {
flowController = FlowController.createStandaloneInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager);
}
processScheduler = new StandardProcessScheduler(flowEngine, flowController, encryptor, flowController.getStateManagerProvider(), nifiProperties);
final RepositoryContextFactory repositoryContextFactory = flowController.getRepositoryContextFactory();
final SchedulingAgent timerDrivenSchedulingAgent = new TimerDrivenSchedulingAgent(flowController, flowEngine, repositoryContextFactory, encryptor, nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenSchedulingAgent);
flowFileSwapManager = flowController.createSwapManager();
resourceClaimManager = flowController.getResourceClaimManager();
}
protected void createFlow() {
rootProcessGroup = flowController.getFlowManager().createProcessGroup(UUID.randomUUID().toString());
rootProcessGroup.setName("Integration Test");
((StandardFlowManager) flowController.getFlowManager()).setRootGroup(rootProcessGroup);
nopProcessor = createProcessorNode(NopProcessor.class);
terminateProcessor = createProcessorNode(TerminateOnce.class);
terminateAllProcessor = createProcessorNode(TerminateAll.class);
}
protected boolean isClusteredTest() {
return false;
}
protected ClusterCoordinator getClusterCoordinator() {
return clusterCoordinator;
}
@After
public final void shutdown() {
logger.info("Shutting down...");
if (flowController != null) {
flowController.shutdown(true);
}
if (flowEngine != null) {
flowEngine.shutdownNow();
}
if (processScheduler != null) {
processScheduler.shutdown();
}
}
protected void restart() throws IOException, ExecutionException, InterruptedException {
logger.info("Shutting down for restart....");
// Save Flow to a byte array
final FlowConfigurationDAO flowDao = new StandardXMLFlowConfigurationDAO(Paths.get("target/int-tests/flow.xml.gz"), flowController.getEncryptor(), nifiProperties, getExtensionManager());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
flowDao.save(flowController, baos);
final byte[] flowBytes = baos.toByteArray();
// Shutdown
flowController.shutdown(true);
flowEngine.shutdownNow();
StandardStateManagerProvider.resetProvider();
// Remove all Log Repositories so that we can restart with the same ID's
for (final ProcessorNode procNode : rootProcessGroup.getProcessors()) {
LogRepositoryFactory.removeRepository(procNode.getIdentifier());
}
// Re-initialize the framework components
initialize();
// Reload the flow
final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(flowController.getEncryptor(), nifiProperties, extensionManager);
flowController.synchronize(flowSynchronizer, new StandardDataFlow(flowBytes, null, null, Collections.emptySet()), Mockito.mock(FlowService.class));
// Reload FlowFiles / initialize flow
final ProcessGroup newRootGroup = flowController.getFlowManager().getRootGroup();
rootProcessGroup = newRootGroup;
final QueueProvider queueProvider = new QueueProvider() {
@Override
public Collection<FlowFileQueue> getAllQueues() {
return newRootGroup.findAllConnections().stream()
.map(Connection::getFlowFileQueue)
.collect(Collectors.toList());
}
};
flowController.initializeFlow(queueProvider);
}
@After
public final void cleanup() throws IOException {
deleteDirectory(new File("target/int-tests"));
}
private void deleteDirectory(final File dir) throws IOException {
if (!dir.exists()) {
return;
}
FileUtils.deleteFile(dir, true);
}
protected FlowFileQueue createFlowFileQueue(final String uuid, final ProcessGroup processGroup) {
final RepositoryContext repoContext = getRepositoryContext();
return new StandardFlowFileQueue(uuid, ConnectionEventListener.NOP_EVENT_LISTENER, repoContext.getFlowFileRepository(), repoContext.getProvenanceRepository(),
resourceClaimManager, processScheduler, flowFileSwapManager, flowController.createEventReporter(), 20000,
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
}
protected final ProcessorNode createProcessorNode(final Class<? extends Processor> processorType) {
return createProcessorNode(processorType.getName());
}
protected final ProcessorNode createProcessorNode(final String processorType) {
final String uuid = getSimpleTypeName(processorType) + "-" + UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate = SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ProcessorNode procNode = flowController.getFlowManager().createProcessor(processorType, uuid, bundleCoordinate, Collections.emptySet(), true, true);
rootProcessGroup.addProcessor(procNode);
return procNode;
}
protected final ControllerServiceNode createControllerServiceNode(final Class<? extends ControllerService> controllerServiceType) {
return createControllerServiceNode(controllerServiceType.getName());
}
protected final ControllerServiceNode createControllerServiceNode(final String controllerServiceType) {
final String uuid = getSimpleTypeName(controllerServiceType) + "-" + UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate = SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ControllerServiceNode serviceNode = flowController.getFlowManager().createControllerService(controllerServiceType, uuid, bundleCoordinate, Collections.emptySet(), true, true);
rootProcessGroup.addControllerService(serviceNode);
return serviceNode;
}
private String getSimpleTypeName(final String className) {
final int index = className.lastIndexOf(".");
if (index >= 0 && index < className.length()) {
return className.substring(index + 1);
} else {
return "";
}
}
protected ProcessGroup getRootGroup() {
return rootProcessGroup;
}
/**
* Creates a Processor that is responsible for generating a FlowFile of the given size and routing to "success".
*
* @param contentSize the number of bytes for the content
*
* @return the ProcessorNode
*/
protected final ProcessorNode createGenerateProcessor(final int contentSize) {
return createGenerateProcessor(contentSize, null);
}
/**
* Creates a Processor that is responsible for generating a FlowFile of the given size and routing to "success". The generated FlowFile is set in the given AtomicReference
*
* @param contentSize the number of bytes for the content
* @param flowFileReference an AtomicReference to hold the flowfile
*
* @return the ProcessorNode
*/
protected final ProcessorNode createGenerateProcessor(final int contentSize, final AtomicReference<FlowFileRecord> flowFileReference) {
return createProcessorNode((context, session) -> {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> out.write(new byte[contentSize]));
if (flowFileReference != null) {
flowFileReference.set((FlowFileRecord) flowFile);
}
session.transfer(flowFile, REL_SUCCESS);
}, REL_SUCCESS);
}
protected final ProcessorNode createProcessorNode(final BiConsumer<ProcessContext, ProcessSession> trigger, final Relationship... relationships) {
final Set<Relationship> relationshipSet = new HashSet<>(Arrays.asList(relationships));
final ProcessorNode processorNode = createProcessorNode(BiConsumerProcessor.class.getName());
final BiConsumerProcessor biConsumerProcessor = (BiConsumerProcessor) processorNode.getProcessor();
biConsumerProcessor.setRelationships(relationshipSet);
biConsumerProcessor.setTrigger(trigger);
return processorNode;
}
protected final Connection connect(final ProcessorNode source, final ProcessorNode destination, final Relationship relationship) {
return connect(source, destination, Collections.singleton(relationship));
}
protected final Connection connect(final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
return connect(rootProcessGroup, source, destination, relationships);
}
protected final Connection connect(ProcessGroup processGroup, final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
final String id = UUID.randomUUID().toString();
final Connection connection = new StandardConnection.Builder(processScheduler)
.source(source)
.destination(destination)
.processGroup(processGroup)
.relationships(relationships)
.id(id)
.clustered(false)
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener, processGroup1) -> createFlowFileQueue(id, processGroup))
.build();
source.addConnection(connection);
destination.addConnection(connection);
processGroup.addConnection(connection);
return connection;
}
protected final Future<Void> start(final ProcessorNode procNode) {
final ValidationStatus validationStatus = procNode.performValidation();
if (validationStatus != ValidationStatus.VALID) {
throw new IllegalStateException("Processor is invalid: " + procNode + ": " + procNode.getValidationErrors());
}
return procNode.getProcessGroup().startProcessor(procNode, true);
}
protected final Future<Void> stop(final ProcessorNode procNode) {
return procNode.getProcessGroup().stopProcessor(procNode);
}
protected final FlowFileQueue getDestinationQueue(final ProcessorNode procNode, final Relationship relationship) {
return procNode.getConnections(relationship).stream()
.map(Connection::getFlowFileQueue)
.findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find queue for relationship with name <" + relationship + ">"));
}
protected final FlowFileRepository getFlowFileRepository() {
return getRepositoryContext().getFlowFileRepository();
}
protected Bundle getSystemBundle() {
return systemBundle;
}
protected final ContentRepository getContentRepository() {
return getRepositoryContext().getContentRepository();
}
protected final ProvenanceEventRepository getProvenanceRepository() {
return getRepositoryContext().getProvenanceRepository();
}
protected RepositoryContext getRepositoryContext() {
return flowController.getRepositoryContextFactory().newProcessContext(nopProcessor, new AtomicLong(0L));
}
protected final ProcessorNode getNopProcessor() {
return nopProcessor;
}
protected final ProcessorNode getTerminateProcessor() {
return terminateProcessor;
}
protected final ProcessorNode getTerminateAllProcessor() {
return terminateAllProcessor;
}
protected final FlowController getFlowController() {
return flowController;
}
protected void assertProvenanceEventCount(final ProvenanceEventType eventType, final int count) throws IOException {
int encountered = 0;
for (final ProvenanceEventRecord event : getProvenanceRepository().getEvents(0L, 100_000_000)) {
if (event.getEventType() == eventType) {
encountered++;
}
}
assertEquals("Expected to encounter " + count + " Provenance Events of type " + eventType + " but encountered " + encountered, count, encountered);
}
protected void triggerOnce(final ProcessorNode processor) throws ExecutionException, InterruptedException {
final String schedulingPeriod = processor.getSchedulingPeriod();
final FlowFileEvent initialReport = getStatusReport(processor);
final int initialInvocations = (initialReport == null) ? 0 : initialReport.getInvocations();
processor.setScheduldingPeriod("1 hour");
// We will only trigger the Processor to run once per hour. So we need to ensure that
// we don't trigger the Processor while it's yielded. So if its yield expiration is in the future,
// wait until the yield expires.
while (processor.getYieldExpiration() > System.currentTimeMillis()) {
Thread.sleep(1L);
}
start(processor).get();
int totalInvocations = initialInvocations;
while (totalInvocations < initialInvocations + 1) {
final FlowFileEvent currentReport = getStatusReport(processor);
totalInvocations = currentReport == null ? 0 : currentReport.getInvocations();
}
stop(processor).get();
processor.setScheduldingPeriod(schedulingPeriod);
}
protected FlowFileEvent getStatusReport(final ProcessorNode processor) {
final FlowFileEventRepository repository = getRepositoryContext().getFlowFileEventRepository();
RepositoryStatusReport statusReport = repository.reportTransferEvents(0L);
return statusReport.getReportEntry(processor.getIdentifier());
}
protected void moveProcessor(final ProcessorNode processor, final ProcessGroup destination) {
final StandardSnippet snippet = new StandardSnippet();
snippet.setParentGroupId(processor.getProcessGroupIdentifier());
snippet.addProcessors(Collections.singletonMap(processor.getIdentifier(), null));
processor.getProcessGroup().move(snippet, destination);
}
protected ExtensionManager getExtensionManager() {
return extensionManager;
}
private PropertyEncryptor createEncryptor() {
return new PropertyEncryptor() {
@Override
public String encrypt(String property) {
return property;
}
@Override
public String decrypt(String encryptedProperty) {
return encryptedProperty;
}
};
}
}