blob: d0b9c13c9756d29c81c7d8075257865215e21b91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.AccessPolicy;
import org.apache.nifi.authorization.Group;
import org.apache.nifi.authorization.MockPolicyBasedAuthorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.User;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.DummyReportingTask;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFlowController {
private FlowController controller;
private AbstractPolicyBasedAuthorizer authorizer;
private FlowFileEventRepository flowFileEventRepo;
private AuditService auditService;
private PropertyEncryptor encryptor;
private NiFiProperties nifiProperties;
private Bundle systemBundle;
private BulletinRepository bulletinRepo;
private VariableRegistry variableRegistry;
private ExtensionDiscoveringManager extensionManager;
@Before
public void setup() {
flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
auditService = Mockito.mock(AuditService.class);
final Map<String, String> otherProps = new HashMap<>();
otherProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName());
otherProps.put("nifi.remote.input.socket.port", "");
otherProps.put("nifi.remote.input.secure", "");
final String propsFile = "src/test/resources/flowcontrollertest.nifi.properties";
nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, otherProps);
encryptor = PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties);
// use the system bundle
systemBundle = SystemBundle.create(nifiProperties);
extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build();
User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build();
Group group1 = new Group.Builder().identifier("group-id-1").name("group-1").addUser(user1.getIdentifier()).build();
Group group2 = new Group.Builder().identifier("group-id-2").name("group-2").build();
AccessPolicy policy1 = new AccessPolicy.Builder()
.identifier("policy-id-1")
.resource("resource1")
.action(RequestAction.READ)
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
AccessPolicy policy2 = new AccessPolicy.Builder()
.identifier("policy-id-2")
.resource("resource2")
.action(RequestAction.READ)
.addGroup(group1.getIdentifier())
.addGroup(group2.getIdentifier())
.addUser(user1.getIdentifier())
.addUser(user2.getIdentifier())
.build();
Set<Group> groups1 = new LinkedHashSet<>();
groups1.add(group1);
groups1.add(group2);
Set<User> users1 = new LinkedHashSet<>();
users1.add(user1);
users1.add(user2);
Set<AccessPolicy> policies1 = new LinkedHashSet<>();
policies1.add(policy1);
policies1.add(policy2);
authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1);
variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
bulletinRepo = Mockito.mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager);
}
@After
public void cleanup() throws Exception {
controller.shutdown(true);
FileUtils.deleteDirectory(new File("./target/flowcontrollertest"));
}
@Test
public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final File flowFile = new File("src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
// should be two controller services
final Set<ControllerServiceNode> controllerServiceNodes = controller.getFlowManager().getAllControllerServices();
assertNotNull(controllerServiceNodes);
assertEquals(2, controllerServiceNodes.size());
// find the controller service that was moved to the root group
final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get();
assertNotNull(rootGroupCs);
// find the controller service that was not moved to the root group
final ControllerServiceNode controllerCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() == null).findFirst().get();
assertNotNull(controllerCs);
// should be same class (not Ghost), different ids, and same properties
assertEquals(rootGroupCs.getCanonicalClassName(), controllerCs.getCanonicalClassName());
assertFalse(rootGroupCs.getCanonicalClassName().contains("Ghost"));
assertNotEquals(rootGroupCs.getIdentifier(), controllerCs.getIdentifier());
assertEquals(rootGroupCs.getProperties(), controllerCs.getProperties());
// should be one processor
final Collection<ProcessorNode> processorNodes = controller.getFlowManager().getGroup(controller.getFlowManager().getRootGroupId()).getProcessors();
assertNotNull(processorNodes);
assertEquals(1, processorNodes.size());
// verify the processor is still pointing at the controller service that got moved to the root group
final ProcessorNode processorNode = processorNodes.stream().findFirst().get();
final PropertyDescriptor procControllerServiceProp = processorNode.getEffectivePropertyValues().entrySet().stream()
.filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
.map(Map.Entry::getKey)
.findFirst()
.get();
assertNotNull(procControllerServiceProp);
// should be one reporting task
final Set<ReportingTaskNode> reportingTaskNodes = controller.getAllReportingTasks();
assertNotNull(reportingTaskNodes);
assertEquals(1, reportingTaskNodes.size());
// verify that the reporting task is pointing at the controller service at the controller level
final ReportingTaskNode reportingTaskNode = reportingTaskNodes.stream().findFirst().get();
final PropertyDescriptor reportingTaskControllerServiceProp = reportingTaskNode.getEffectivePropertyValues().entrySet().stream()
.filter(e -> e.getValue().equals(controllerCs.getIdentifier()))
.map(Map.Entry::getKey)
.findFirst()
.get();
assertNotNull(reportingTaskControllerServiceProp);
}
@Test
public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
try {
// should be two controller services
final Set<ControllerServiceNode> controllerServiceNodes = controller.getFlowManager().getAllControllerServices();
assertNotNull(controllerServiceNodes);
assertEquals(1, controllerServiceNodes.size());
// find the controller service that was moved to the root group
final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get();
assertNotNull(rootGroupCs);
// should be one processor
final Collection<ProcessorNode> processorNodes = controller.getFlowManager().getRootGroup().getProcessors();
assertNotNull(processorNodes);
assertEquals(1, processorNodes.size());
// verify the processor is still pointing at the controller service that got moved to the root group
final ProcessorNode processorNode = processorNodes.stream().findFirst().get();
final PropertyDescriptor procControllerServiceProp = processorNode.getRawPropertyValues().entrySet().stream()
.filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
.map(Map.Entry::getKey)
.findFirst()
.get();
assertNotNull(procControllerServiceProp);
} finally {
purgeFlow();
}
}
@Test
public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenAuthorizationsAreDifferent() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
controller.initializeFlow();
try {
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
assertNotEquals(authFingerprint, authorizer.getFingerprint());
} finally {
purgeFlow();
}
}
private void purgeFlow() {
final ProcessGroup processGroup = controller.getFlowManager().getRootGroup();
for (final ProcessorNode procNode : processGroup.getProcessors()) {
processGroup.removeProcessor(procNode);
}
for (final ControllerServiceNode serviceNode : controller.getFlowManager().getAllControllerServices()) {
controller.getControllerServiceProvider().removeControllerService(serviceNode);
}
}
@Test
public void testSynchronizeFlowWhenAuthorizationsAreDifferentAndFlowEmpty() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
// create a mock proposed data flow with different auth fingerprint as the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
assertNotEquals(authFingerprint, authorizer.getFingerprint());
assertTrue(authorizer.getGroups().isEmpty());
assertTrue(authorizer.getUsers().isEmpty());
assertTrue(authorizer.getAccessPolicies().isEmpty());
}
@Test
public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, authFingerprint.getBytes(StandardCharsets.UTF_8), Collections.emptySet());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
controller.initializeFlow();
final DataFlow dataflowWithNullAuthorizations = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8), null, null, Collections.emptySet());
try {
controller.synchronize(standardFlowSynchronizer, dataflowWithNullAuthorizations, Mockito.mock(FlowService.class));
Assert.fail("Was able to synchronize controller with null authorizations but dataflow wasn't empty");
} catch (final UninheritableFlowException expected) {
// expected
} finally {
purgeFlow();
}
}
@Test
public void testSynchronizeFlowWhenProposedAuthorizationsAreNullAndEmptyFlow() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
assertTrue(authorizer.getGroups().isEmpty());
assertTrue(authorizer.getUsers().isEmpty());
assertTrue(authorizer.getAccessPolicies().isEmpty());
}
/**
* StandardProcessScheduler is created by FlowController. The StandardProcessScheduler needs access to the Controller Service Provider,
* but the Controller Service Provider needs the ProcessScheduler in its constructor. So the StandardProcessScheduler obtains the Controller Service
* Provider by making a call back to FlowController.getControllerServiceProvider. This test exists to ensure that we always have access to the
* Controller Service Provider in the Process Scheduler, and that we don't inadvertently start storing away the result of calling
* FlowController.getControllerServiceProvider() before the service provider has been fully initialized.
*/
@Test
public void testProcessSchedulerHasAccessToControllerServiceProvider() {
final StandardProcessScheduler scheduler = controller.getProcessScheduler();
assertNotNull(scheduler);
final ControllerServiceProvider serviceProvider = scheduler.getControllerServiceProvider();
assertNotNull(serviceProvider);
assertSame(serviceProvider, controller.getControllerServiceProvider());
}
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
authorizer = new MockPolicyBasedAuthorizer();
assertNotEquals(authFingerprint, authorizer.getFingerprint());
controller.shutdown(true);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager);
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@Test
public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final Set<String> missingComponents = new HashSet<>();
missingComponents.add("1");
missingComponents.add("2");
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents);
try {
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, Mockito.mock(FlowService.class));
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage().contains("Proposed flow has missing components that are not considered missing in the current flow (1,2)"));
}
}
@Test
public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() throws IOException {
final PropertyEncryptor encryptor = PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties);
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(encryptor, nifiProperties, extensionManager);
final ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
when(mockProcessorNode.getIdentifier()).thenReturn("1");
when(mockProcessorNode.isExtensionMissing()).thenReturn(true);
final ControllerServiceNode mockControllerServiceNode = mock(ControllerServiceNode.class);
when(mockControllerServiceNode.getIdentifier()).thenReturn("2");
when(mockControllerServiceNode.isExtensionMissing()).thenReturn(true);
final ReportingTaskNode mockReportingTaskNode = mock(ReportingTaskNode.class);
when(mockReportingTaskNode.getIdentifier()).thenReturn("3");
when(mockReportingTaskNode.isExtensionMissing()).thenReturn(true);
final ProcessGroup mockRootGroup = mock(ProcessGroup.class);
when(mockRootGroup.findAllProcessors()).thenReturn(Collections.singletonList(mockProcessorNode));
final SnippetManager mockSnippetManager = mock(SnippetManager.class);
when(mockSnippetManager.export()).thenReturn(new byte[0]);
final FlowManager flowManager = Mockito.mock(FlowManager.class);
final FlowController mockFlowController = mock(FlowController.class);
when(mockFlowController.getFlowManager()).thenReturn(flowManager);
when(flowManager.getRootGroup()).thenReturn(mockRootGroup);
when(flowManager.getAllControllerServices()).thenReturn(new HashSet<>(Arrays.asList(mockControllerServiceNode)));
when(flowManager.getAllReportingTasks()).thenReturn(new HashSet<>(Arrays.asList(mockReportingTaskNode)));
when(mockFlowController.getAuthorizer()).thenReturn(authorizer);
when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager);
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
when(proposedDataFlow.getMissingComponents()).thenReturn(new HashSet<>());
try {
standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, encryptor, Mockito.mock(FlowService.class));
Assert.fail("Should have thrown exception");
} catch (UninheritableFlowException e) {
assertTrue(e.getMessage(), e.getMessage().contains("Current flow has missing components that are not considered missing in the proposed flow (1,2,3)"));
}
}
@Test
public void testSynchronizeFlowWhenBundlesAreSame() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
logRepository.removeAllObservers();
syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer);
syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer);
}
@Test
public void testSynchronizeFlowWhenBundlesAreDifferent() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(
PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties), nifiProperties, extensionManager);
final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
logRepository.removeAllObservers();
// first sync should work because we are syncing to an empty flow controller
syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer);
controller.initializeFlow();
// second sync should fail because the bundle of the processor is different
try {
syncFlow("src/test/resources/nifi/fingerprint/flow4-with-different-bundle.xml", standardFlowSynchronizer);
Assert.fail("Should have thrown UninheritableFlowException");
} catch (UninheritableFlowException e) {
//e.printStackTrace();
}
}
private void syncFlow(String flowXmlFile, FlowSynchronizer standardFlowSynchronizer) throws IOException {
String flowString = null;
try (final InputStream in = new FileInputStream(flowXmlFile)) {
flowString = IOUtils.toString(in, StandardCharsets.UTF_8);
}
assertNotNull(flowString);
final byte[] flowBytes = flowString.getBytes(StandardCharsets.UTF_8);
final String authFingerprint = authorizer.getFingerprint();
final byte[] authFingerprintBytes = authFingerprint.getBytes(StandardCharsets.UTF_8);
final DataFlow proposedDataFlow1 = new StandardDataFlow(flowBytes, null, authFingerprintBytes, Collections.emptySet());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow1, Mockito.mock(FlowService.class));
}
@Test
public void testCreateMissingProcessor() {
final ProcessorNode procNode = controller.getFlowManager().createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor",
systemBundle.getBundleDetails().getCoordinate());
assertNotNull(procNode);
assertEquals("org.apache.nifi.NonExistingProcessor", procNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingProcessor", procNode.getComponentType());
final PropertyDescriptor descriptor = procNode.getPropertyDescriptor("my descriptor");
assertNotNull(descriptor);
assertEquals("my descriptor", descriptor.getName());
assertTrue(descriptor.isRequired());
assertTrue(descriptor.isSensitive());
final Relationship relationship = procNode.getRelationship("my relationship");
assertEquals("my relationship", relationship.getName());
}
@Test
public void testCreateMissingReportingTask() throws ReportingTaskInstantiationException {
final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task",
systemBundle.getBundleDetails().getCoordinate(), true);
assertNotNull(taskNode);
assertEquals("org.apache.nifi.NonExistingReportingTask", taskNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingReportingTask", taskNode.getComponentType());
final PropertyDescriptor descriptor = taskNode.getReportingTask().getPropertyDescriptor("my descriptor");
assertNotNull(descriptor);
assertEquals("my descriptor", descriptor.getName());
assertTrue(descriptor.isRequired());
assertTrue(descriptor.isSensitive());
}
@Test
public void testCreateMissingControllerService() throws ProcessorInstantiationException {
final ControllerServiceNode serviceNode = controller.getFlowManager().createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service",
systemBundle.getBundleDetails().getCoordinate(), null, false, true);
assertNotNull(serviceNode);
assertEquals("org.apache.nifi.NonExistingControllerService", serviceNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingControllerService", serviceNode.getComponentType());
final ControllerService service = serviceNode.getControllerServiceImplementation();
final PropertyDescriptor descriptor = service.getPropertyDescriptor("my descriptor");
assertNotNull(descriptor);
assertEquals("my descriptor", descriptor.getName());
assertTrue(descriptor.isRequired());
assertTrue(descriptor.isSensitive());
assertEquals("GhostControllerService[id=1234-Controller-Service, type=org.apache.nifi.NonExistingControllerService]", service.toString());
service.hashCode(); // just make sure that an Exception is not thrown
assertTrue(service.equals(service));
assertFalse(service.equals(serviceNode));
}
@Test
public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException, ClassNotFoundException, InstantiationException, IllegalAccessException {
ProcessorNode p_scheduled = controller.getFlowManager().createProcessor(DummyScheduledProcessor.class.getName(), "1234-ScheduledProcessor",
systemBundle.getBundleDetails().getCoordinate());
assertEquals(5, p_scheduled.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.CRON_DRIVEN, p_scheduled.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?", p_scheduled.getSchedulingPeriod());
assertEquals("1 sec", p_scheduled.getYieldPeriod());
assertEquals("30 sec", p_scheduled.getPenalizationPeriod());
assertEquals(LogLevel.WARN, p_scheduled.getBulletinLevel());
}
@Test
public void testReportingTaskDefaultScheduleAnnotation() throws ReportingTaskInstantiationException {
ReportingTaskNode p_scheduled = controller.getFlowManager().createReportingTask(DummyScheduledReportingTask.class.getName(), systemBundle.getBundleDetails().getCoordinate());
assertEquals(SchedulingStrategy.CRON_DRIVEN, p_scheduled.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?", p_scheduled.getSchedulingPeriod());
}
@Test
public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException, ClassNotFoundException {
ProcessorNode p_settings = controller.getFlowManager().createProcessor(DummySettingsProcessor.class.getName(), "1234-SettingsProcessor", systemBundle.getBundleDetails().getCoordinate());
assertEquals("5 sec", p_settings.getYieldPeriod());
assertEquals("1 min", p_settings.getPenalizationPeriod());
assertEquals(LogLevel.DEBUG, p_settings.getBulletinLevel());
assertEquals(1, p_settings.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.TIMER_DRIVEN, p_settings.getSchedulingStrategy());
assertEquals("0 sec", p_settings.getSchedulingPeriod());
}
@Test
public void testPrimaryNodeOnlyAnnotation() throws ProcessorInstantiationException {
String id = UUID.randomUUID().toString();
ProcessorNode processorNode = controller.getFlowManager().createProcessor(DummyPrimaryNodeOnlyProcessor.class.getName(), id, systemBundle.getBundleDetails().getCoordinate());
assertEquals(ExecutionNode.PRIMARY, processorNode.getExecutionNode());
}
@Test
public void testDeleteProcessGroup() {
ProcessGroup pg = controller.getFlowManager().createProcessGroup("my-process-group");
pg.setName("my-process-group");
ControllerServiceNode cs = controller.getFlowManager().createControllerService("org.apache.nifi.NonExistingControllerService", "my-controller-service",
systemBundle.getBundleDetails().getCoordinate(), null, false, true);
pg.addControllerService(cs);
controller.getFlowManager().getRootGroup().addProcessGroup(pg);
controller.getFlowManager().getRootGroup().removeProcessGroup(pg);
pg.getControllerServices(true);
assertTrue(pg.getControllerServices(true).isEmpty());
}
@Test
public void testReloadProcessor() throws ProcessorInstantiationException {
final String id = "1234-ScheduledProcessor" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ProcessorNode processorNode = controller.getFlowManager().createProcessor(DummyScheduledProcessor.class.getName(), id, coordinate);
final String originalName = processorNode.getName();
assertEquals(id, processorNode.getIdentifier());
assertEquals(id, processorNode.getComponent().getIdentifier());
assertEquals(coordinate.getCoordinate(), processorNode.getBundleCoordinate().getCoordinate());
assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getCanonicalClassName());
assertEquals(DummyScheduledProcessor.class.getSimpleName(), processorNode.getComponentType());
assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getComponent().getClass().getCanonicalName());
assertEquals(5, processorNode.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.CRON_DRIVEN, processorNode.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?", processorNode.getSchedulingPeriod());
assertEquals("1 sec", processorNode.getYieldPeriod());
assertEquals("30 sec", processorNode.getPenalizationPeriod());
assertEquals(LogLevel.WARN, processorNode.getBulletinLevel());
// now change the type of the processor from DummyScheduledProcessor to DummySettingsProcessor
controller.getReloadComponent().reload(processorNode, DummySettingsProcessor.class.getName(), coordinate, Collections.emptySet());
// ids and coordinate should stay the same
assertEquals(id, processorNode.getIdentifier());
assertEquals(id, processorNode.getComponent().getIdentifier());
assertEquals(coordinate.getCoordinate(), processorNode.getBundleCoordinate().getCoordinate());
// in this test we happened to change between two processors that have different canonical class names
// but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so
// for the sake of this test it is ok that the canonical class name hasn't changed
assertEquals(originalName, processorNode.getName());
assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getCanonicalClassName());
assertEquals(DummyScheduledProcessor.class.getSimpleName(), processorNode.getComponentType());
assertEquals(DummySettingsProcessor.class.getCanonicalName(), processorNode.getComponent().getClass().getCanonicalName());
// all these settings should have stayed the same
assertEquals(5, processorNode.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.CRON_DRIVEN, processorNode.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?", processorNode.getSchedulingPeriod());
assertEquals("1 sec", processorNode.getYieldPeriod());
assertEquals("30 sec", processorNode.getPenalizationPeriod());
assertEquals(LogLevel.WARN, processorNode.getBulletinLevel());
}
@Test
public void testReloadProcessorWithAdditionalResources() throws ProcessorInstantiationException, MalformedURLException {
final URL resource1 = new File("src/test/resources/TestClasspathResources/resource1.txt").toURI().toURL();
final URL resource2 = new File("src/test/resources/TestClasspathResources/resource2.txt").toURI().toURL();
final URL resource3 = new File("src/test/resources/TestClasspathResources/resource3.txt").toURI().toURL();
final Set<URL> additionalUrls = new LinkedHashSet<>(Arrays.asList(resource1, resource2, resource3));
final String id = "1234-ScheduledProcessor" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ProcessorNode processorNode = controller.getFlowManager().createProcessor(DummyScheduledProcessor.class.getName(), id, coordinate);
final String originalName = processorNode.getName();
// the instance class loader shouldn't have any of the resources yet
InstanceClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id);
assertNotNull(instanceClassLoader);
assertFalse(containsResource(instanceClassLoader.getURLs(), resource1));
assertFalse(containsResource(instanceClassLoader.getURLs(), resource2));
assertFalse(containsResource(instanceClassLoader.getURLs(), resource3));
assertTrue(instanceClassLoader.getAdditionalResourceUrls().isEmpty());
// now change the type of the processor from DummyScheduledProcessor to DummySettingsProcessor
controller.getReloadComponent().reload(processorNode, DummySettingsProcessor.class.getName(), coordinate, additionalUrls);
// the instance class loader shouldn't have any of the resources yet
instanceClassLoader = extensionManager.getInstanceClassLoader(id);
assertNotNull(instanceClassLoader);
assertTrue(containsResource(instanceClassLoader.getURLs(), resource1));
assertTrue(containsResource(instanceClassLoader.getURLs(), resource2));
assertTrue(containsResource(instanceClassLoader.getURLs(), resource3));
assertEquals(3, instanceClassLoader.getAdditionalResourceUrls().size());
}
@Test
public void testReloadControllerService() {
final String id = "ServiceA" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
final String originalName = controllerServiceNode.getName();
assertEquals(id, controllerServiceNode.getIdentifier());
assertEquals(id, controllerServiceNode.getComponent().getIdentifier());
assertEquals(coordinate.getCoordinate(), controllerServiceNode.getBundleCoordinate().getCoordinate());
assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getCanonicalClassName());
assertEquals(ServiceA.class.getSimpleName(), controllerServiceNode.getComponentType());
assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getComponent().getClass().getCanonicalName());
controller.getReloadComponent().reload(controllerServiceNode, ServiceB.class.getName(), coordinate, Collections.emptySet());
// ids and coordinate should stay the same
assertEquals(id, controllerServiceNode.getIdentifier());
assertEquals(id, controllerServiceNode.getComponent().getIdentifier());
assertEquals(coordinate.getCoordinate(), controllerServiceNode.getBundleCoordinate().getCoordinate());
// in this test we happened to change between two services that have different canonical class names
// but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so
// for the sake of this test it is ok that the canonical class name hasn't changed
assertEquals(originalName, controllerServiceNode.getName());
assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getCanonicalClassName());
assertEquals(ServiceA.class.getSimpleName(), controllerServiceNode.getComponentType());
assertEquals(ServiceB.class.getCanonicalName(), controllerServiceNode.getComponent().getClass().getCanonicalName());
}
@Test
public void testReloadControllerServiceWithAdditionalResources() throws MalformedURLException {
final URL resource1 = new File("src/test/resources/TestClasspathResources/resource1.txt").toURI().toURL();
final URL resource2 = new File("src/test/resources/TestClasspathResources/resource2.txt").toURI().toURL();
final URL resource3 = new File("src/test/resources/TestClasspathResources/resource3.txt").toURI().toURL();
final Set<URL> additionalUrls = new LinkedHashSet<>(Arrays.asList(resource1, resource2, resource3));
final String id = "ServiceA" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
// the instance class loader shouldn't have any of the resources yet
URLClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id);
assertNotNull(instanceClassLoader);
assertFalse(containsResource(instanceClassLoader.getURLs(), resource1));
assertFalse(containsResource(instanceClassLoader.getURLs(), resource2));
assertFalse(containsResource(instanceClassLoader.getURLs(), resource3));
assertTrue(instanceClassLoader instanceof InstanceClassLoader);
assertTrue(((InstanceClassLoader) instanceClassLoader).getAdditionalResourceUrls().isEmpty());
controller.getReloadComponent().reload(controllerServiceNode, ServiceB.class.getName(), coordinate, additionalUrls);
// the instance class loader shouldn't have any of the resources yet
instanceClassLoader = extensionManager.getInstanceClassLoader(id);
assertNotNull(instanceClassLoader);
assertTrue(containsResource(instanceClassLoader.getURLs(), resource1));
assertTrue(containsResource(instanceClassLoader.getURLs(), resource2));
assertTrue(containsResource(instanceClassLoader.getURLs(), resource3));
assertTrue(instanceClassLoader instanceof InstanceClassLoader);
assertEquals(3, ((InstanceClassLoader) instanceClassLoader).getAdditionalResourceUrls().size());
}
@Test
public void testReloadReportingTask() throws ReportingTaskInstantiationException {
final String id = "ReportingTask" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ReportingTaskNode node = controller.createReportingTask(DummyReportingTask.class.getName(), id, coordinate, true);
final String originalName = node.getName();
assertEquals(id, node.getIdentifier());
assertEquals(id, node.getComponent().getIdentifier());
assertEquals(coordinate.getCoordinate(), node.getBundleCoordinate().getCoordinate());
assertEquals(DummyReportingTask.class.getCanonicalName(), node.getCanonicalClassName());
assertEquals(DummyReportingTask.class.getSimpleName(), node.getComponentType());
assertEquals(DummyReportingTask.class.getCanonicalName(), node.getComponent().getClass().getCanonicalName());
controller.getReloadComponent().reload(node, DummyScheduledReportingTask.class.getName(), coordinate, Collections.emptySet());
// ids and coordinate should stay the same
assertEquals(id, node.getIdentifier());
assertEquals(id, node.getComponent().getIdentifier());
assertEquals(coordinate.getCoordinate(), node.getBundleCoordinate().getCoordinate());
// in this test we happened to change between two services that have different canonical class names
// but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so
// for the sake of this test it is ok that the canonical class name hasn't changed
assertEquals(originalName, node.getName());
assertEquals(DummyReportingTask.class.getCanonicalName(), node.getCanonicalClassName());
assertEquals(DummyReportingTask.class.getSimpleName(), node.getComponentType());
assertEquals(DummyScheduledReportingTask.class.getCanonicalName(), node.getComponent().getClass().getCanonicalName());
}
@Test
public void testReloadReportingTaskWithAdditionalResources() throws ReportingTaskInstantiationException, MalformedURLException {
final URL resource1 = new File("src/test/resources/TestClasspathResources/resource1.txt").toURI().toURL();
final URL resource2 = new File("src/test/resources/TestClasspathResources/resource2.txt").toURI().toURL();
final URL resource3 = new File("src/test/resources/TestClasspathResources/resource3.txt").toURI().toURL();
final Set<URL> additionalUrls = new LinkedHashSet<>(Arrays.asList(resource1, resource2, resource3));
final String id = "ReportingTask" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ReportingTaskNode node = controller.createReportingTask(DummyReportingTask.class.getName(), id, coordinate, true);
// the instance class loader shouldn't have any of the resources yet
InstanceClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id);
assertNotNull(instanceClassLoader);
assertFalse(containsResource(instanceClassLoader.getURLs(), resource1));
assertFalse(containsResource(instanceClassLoader.getURLs(), resource2));
assertFalse(containsResource(instanceClassLoader.getURLs(), resource3));
assertTrue(instanceClassLoader.getAdditionalResourceUrls().isEmpty());
controller.getReloadComponent().reload(node, DummyScheduledReportingTask.class.getName(), coordinate, additionalUrls);
// the instance class loader shouldn't have any of the resources yet
instanceClassLoader = extensionManager.getInstanceClassLoader(id);
assertNotNull(instanceClassLoader);
assertTrue(containsResource(instanceClassLoader.getURLs(), resource1));
assertTrue(containsResource(instanceClassLoader.getURLs(), resource2));
assertTrue(containsResource(instanceClassLoader.getURLs(), resource3));
assertEquals(3, instanceClassLoader.getAdditionalResourceUrls().size());
}
private boolean containsResource(URL[] resources, URL resourceToFind) {
for (URL resource : resources) {
if (resourceToFind.getPath().equals(resource.getPath())) {
return true;
}
}
return false;
}
@Test(expected = IllegalArgumentException.class)
public void testInstantiateSnippetWhenProcessorMissingBundle() throws Exception {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ProcessorNode processorNode = controller.getFlowManager().createProcessor(DummyProcessor.class.getName(), id, coordinate);
// create a processor dto
final ProcessorDTO processorDTO = new ProcessorDTO();
processorDTO.setId(UUID.randomUUID().toString()); // use a different id here
processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0)));
processorDTO.setStyle(processorNode.getStyle());
processorDTO.setParentGroupId("1234");
processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
processorDTO.setRestricted(processorNode.isRestricted());
processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted());
processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
processorDTO.setType(processorNode.getCanonicalClassName());
processorDTO.setBundle(null); // missing bundle
processorDTO.setName(processorNode.getName());
processorDTO.setState(processorNode.getScheduledState().toString());
processorDTO.setRelationships(new ArrayList<>());
processorDTO.setDescription("description");
processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
processorDTO.setSupportsBatching(processorNode.isSessionBatchingSupported());
ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod());
configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod());
configDTO.setYieldDuration(processorNode.getYieldPeriod());
configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS));
configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks());
configDTO.setLossTolerant(processorNode.isLossTolerant());
configDTO.setComments(processorNode.getComments());
configDTO.setBulletinLevel(processorNode.getBulletinLevel().name());
configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
configDTO.setExecutionNode(processorNode.getExecutionNode().name());
configDTO.setAnnotationData(processorNode.getAnnotationData());
processorDTO.setConfig(configDTO);
// create the snippet with the processor
final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
flowSnippetDTO.setProcessors(Collections.singleton(processorDTO));
// instantiate the snippet
assertEquals(0, controller.getFlowManager().getRootGroup().getProcessors().size());
controller.getFlowManager().instantiateSnippet(controller.getFlowManager().getRootGroup(), flowSnippetDTO);
}
@Test
public void testInstantiateSnippetWithProcessor() throws ProcessorInstantiationException {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ProcessorNode processorNode = controller.getFlowManager().createProcessor(DummyProcessor.class.getName(), id, coordinate);
// create a processor dto
final ProcessorDTO processorDTO = new ProcessorDTO();
processorDTO.setId(UUID.randomUUID().toString()); // use a different id here
processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0)));
processorDTO.setStyle(processorNode.getStyle());
processorDTO.setParentGroupId("1234");
processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
processorDTO.setRestricted(processorNode.isRestricted());
processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted());
processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
processorDTO.setType(processorNode.getCanonicalClassName());
processorDTO.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
processorDTO.setName(processorNode.getName());
processorDTO.setState(processorNode.getScheduledState().toString());
processorDTO.setRelationships(new ArrayList<>());
processorDTO.setDescription("description");
processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
processorDTO.setSupportsBatching(processorNode.isSessionBatchingSupported());
ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod());
configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod());
configDTO.setYieldDuration(processorNode.getYieldPeriod());
configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS));
configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks());
configDTO.setLossTolerant(processorNode.isLossTolerant());
configDTO.setComments(processorNode.getComments());
configDTO.setBulletinLevel(processorNode.getBulletinLevel().name());
configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
configDTO.setExecutionNode(processorNode.getExecutionNode().name());
configDTO.setAnnotationData(processorNode.getAnnotationData());
processorDTO.setConfig(configDTO);
// create the snippet with the processor
final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
flowSnippetDTO.setProcessors(Collections.singleton(processorDTO));
// instantiate the snippet
assertEquals(0, controller.getFlowManager().getRootGroup().getProcessors().size());
controller.getFlowManager().instantiateSnippet(controller.getFlowManager().getRootGroup(), flowSnippetDTO);
assertEquals(1, controller.getFlowManager().getRootGroup().getProcessors().size());
}
@Test
public void testInstantiateSnippetWithDisabledProcessor() throws ProcessorInstantiationException {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ProcessorNode processorNode = controller.getFlowManager().createProcessor(DummyProcessor.class.getName(), id, coordinate);
processorNode.disable();
// create a processor dto
final ProcessorDTO processorDTO = new ProcessorDTO();
processorDTO.setId(UUID.randomUUID().toString()); // use a different id here
processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0)));
processorDTO.setStyle(processorNode.getStyle());
processorDTO.setParentGroupId("1234");
processorDTO.setInputRequirement(processorNode.getInputRequirement().name());
processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class));
processorDTO.setRestricted(processorNode.isRestricted());
processorDTO.setExecutionNodeRestricted(processorNode.isExecutionNodeRestricted());
processorDTO.setExtensionMissing(processorNode.isExtensionMissing());
processorDTO.setType(processorNode.getCanonicalClassName());
processorDTO.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
processorDTO.setName(processorNode.getName());
processorDTO.setState(processorNode.getScheduledState().toString());
processorDTO.setRelationships(new ArrayList<>());
processorDTO.setDescription("description");
processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
processorDTO.setSupportsBatching(processorNode.isSessionBatchingSupported());
ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod());
configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod());
configDTO.setYieldDuration(processorNode.getYieldPeriod());
configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS));
configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks());
configDTO.setLossTolerant(processorNode.isLossTolerant());
configDTO.setComments(processorNode.getComments());
configDTO.setBulletinLevel(processorNode.getBulletinLevel().name());
configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name());
configDTO.setExecutionNode(processorNode.getExecutionNode().name());
configDTO.setAnnotationData(processorNode.getAnnotationData());
processorDTO.setConfig(configDTO);
// create the snippet with the processor
final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
flowSnippetDTO.setProcessors(Collections.singleton(processorDTO));
// instantiate the snippet
assertEquals(0, controller.getFlowManager().getRootGroup().getProcessors().size());
controller.getFlowManager().instantiateSnippet(controller.getFlowManager().getRootGroup(), flowSnippetDTO);
assertEquals(1, controller.getFlowManager().getRootGroup().getProcessors().size());
assertTrue(controller.getFlowManager().getRootGroup().getProcessors().iterator().next().getScheduledState().equals(ScheduledState.DISABLED));
}
@Test(expected = IllegalArgumentException.class)
public void testInstantiateSnippetWhenControllerServiceMissingBundle() throws ProcessorInstantiationException {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
// create the controller service dto
final ControllerServiceDTO csDto = new ControllerServiceDTO();
csDto.setId(UUID.randomUUID().toString()); // use a different id
csDto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier());
csDto.setName(controllerServiceNode.getName());
csDto.setType(controllerServiceNode.getCanonicalClassName());
csDto.setBundle(null); // missing bundle
csDto.setState(controllerServiceNode.getState().name());
csDto.setAnnotationData(controllerServiceNode.getAnnotationData());
csDto.setComments(controllerServiceNode.getComments());
csDto.setPersistsState(controllerServiceNode.getControllerServiceImplementation().getClass().isAnnotationPresent(Stateful.class));
csDto.setRestricted(controllerServiceNode.isRestricted());
csDto.setExtensionMissing(controllerServiceNode.isExtensionMissing());
csDto.setDescriptors(new LinkedHashMap<>());
csDto.setProperties(new LinkedHashMap<>());
// create the snippet with the controller service
final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
flowSnippetDTO.setControllerServices(Collections.singleton(csDto));
// instantiate the snippet
assertEquals(0, controller.getFlowManager().getRootGroup().getControllerServices(false).size());
controller.getFlowManager().instantiateSnippet(controller.getFlowManager().getRootGroup(), flowSnippetDTO);
}
@Test
public void testInstantiateSnippetWithControllerService() throws ProcessorInstantiationException {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
// create the controller service dto
final ControllerServiceDTO csDto = new ControllerServiceDTO();
csDto.setId(UUID.randomUUID().toString()); // use a different id
csDto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier());
csDto.setName(controllerServiceNode.getName());
csDto.setType(controllerServiceNode.getCanonicalClassName());
csDto.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion()));
csDto.setState(controllerServiceNode.getState().name());
csDto.setAnnotationData(controllerServiceNode.getAnnotationData());
csDto.setComments(controllerServiceNode.getComments());
csDto.setPersistsState(controllerServiceNode.getControllerServiceImplementation().getClass().isAnnotationPresent(Stateful.class));
csDto.setRestricted(controllerServiceNode.isRestricted());
csDto.setExtensionMissing(controllerServiceNode.isExtensionMissing());
csDto.setDescriptors(new LinkedHashMap<>());
csDto.setProperties(new LinkedHashMap<>());
// create the snippet with the controller service
final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO();
flowSnippetDTO.setControllerServices(Collections.singleton(csDto));
// instantiate the snippet
assertEquals(0, controller.getFlowManager().getRootGroup().getControllerServices(false).size());
controller.getFlowManager().instantiateSnippet(controller.getFlowManager().getRootGroup(), flowSnippetDTO);
assertEquals(1, controller.getFlowManager().getRootGroup().getControllerServices(false).size());
}
}