blob: b2f38fe620312b2f6478f22290b4610264812251 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.StandardExtensionDiscoveringManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Validate Processor's life-cycle operation within the context of
* {@link FlowController} and {@link StandardProcessScheduler}
*/
public class ProcessorLifecycleIT {
private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class);
private static final long SHORT_DELAY_TOLERANCE = 10000L;
private static final long MEDIUM_DELAY_TOLERANCE = 15000L;
private static final long LONG_DELAY_TOLERANCE = 20000L;
private FlowManager flowManager;
private Map<String, String> properties = new HashMap<>();
private volatile String propsFile = "src/test/resources/lifecycletest.nifi.properties";
private ProcessScheduler processScheduler;
@Before
public void before() throws Exception {
properties.put("P", "hello");
}
@After
public void after() throws Exception {
if (processScheduler != null) {
processScheduler.shutdown();
processScheduler = null;
}
FileUtils.deleteDirectory(new File("./target/lifecycletest"));
}
private void assertCondition(final Supplier<Boolean> supplier) {
assertCondition(supplier, SHORT_DELAY_TOLERANCE);
}
private void assertCondition(final Supplier<Boolean> supplier, final long delayToleranceMillis) {
final long startTime = System.currentTimeMillis();
while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
Thread.interrupted();
break;
}
}
assertTrue(supplier.get());
}
@Test
public void validateEnableOperation() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
// validates idempotency
for (int i = 0; i < 2; i++) {
testProcNode.enable();
}
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
testProcNode.disable();
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
}
@Test
public void validateDisableOperation() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
// validates idempotency
for (int i = 0; i < 2; i++) {
testProcNode.disable();
}
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
}
/**
* Will validate the idempotent nature of processor start operation which
* can be called multiple times without any side-effects.
*/
@Test
public void validateIdempotencyOfProcessorStartOperation() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.noop(testProcessor);
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
processScheduler.startProcessor(testProcNode, true);
processScheduler.startProcessor(testProcNode, true);
Thread.sleep(500);
assertCondition(() -> testProcessor.operationNames.size() == 1);
assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
}
/**
* Validates that stop calls are harmless and idempotent if processor is not
* in STARTING or RUNNING state.
*/
@Test
public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
// sets the scenario for the processor to run
int randomDelayLimit = 3000;
this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertTrue(testProcessor.operationNames.isEmpty());
}
/**
* Validates the processors start/stop sequence where the order of
* operations can only be @OnScheduled, @OnUnscheduled, @OnStopped.
*/
@Test
@Ignore
public void validateSuccessfulAndOrderlyShutdown() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
int randomDelayLimit = 3000;
this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
testProcNode.setMaxConcurrentTasks(4);
testProcNode.setScheduldingPeriod("500 millis");
testProcNode.setAutoTerminatedRelationships(Collections.singleton(new Relationship.Builder().name("success").build()));
testGroup.addProcessor(testProcNode);
flowManager.getGroup(testGroup.getIdentifier()).startProcessing();
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
flowManager.getRootGroup().stopProcessing();
Thread.sleep(randomDelayLimit); // up to randomDelayLimit, otherwise next assertion may fail as the processor still executing
// validates that regardless of how many running tasks, lifecycle
// operation are invoked atomically (once each).
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
// . . . hence only 3 operations must be in the list
assertCondition(() -> testProcessor.operationNames.size() == 3, SHORT_DELAY_TOLERANCE);
// . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped
assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
assertEquals("@OnStopped", testProcessor.operationNames.get(2));
}
/**
* Concurrency test that is basically hammers on both stop and start
* operation validating their idempotency.
*/
@Test
@Ignore
public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.noop(testProcessor);
ExecutorService executor = Executors.newFixedThreadPool(100);
int startCallsCount = 10000;
final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
final Random random = new Random();
for (int i = 0; i < startCallsCount / 2; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
LockSupport.parkNanos(random.nextInt(9000000));
processScheduler.stopProcessor(testProcNode);
countDownCounter.countDown();
}
});
}
for (int i = 0; i < startCallsCount / 2; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
LockSupport.parkNanos(random.nextInt(9000000));
processScheduler.startProcessor(testProcNode, true);
countDownCounter.countDown();
}
});
}
assertTrue(countDownCounter.await(1000000, TimeUnit.MILLISECONDS));
String previousOperation = null;
for (String operationName : testProcessor.operationNames) {
if (previousOperation == null || previousOperation.equals("@OnStopped")) {
assertEquals("@OnScheduled", operationName);
} else if (previousOperation.equals("@OnScheduled")) {
assertEquals("@OnUnscheduled", operationName);
} else if (previousOperation.equals("@OnUnscheduled")) {
assertTrue(operationName.equals("@OnStopped") || operationName.equals("@OnScheduled"));
}
previousOperation = operationName;
}
executor.shutdownNow();
}
/**
* Validates that processor can be stopped before start sequence finished.
*/
@Test
public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
int delay = 200;
this.longRunningOnSchedule(testProcessor, delay);
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
assertCondition(() -> testProcessor.operationNames.size() == 3, LONG_DELAY_TOLERANCE);
assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
assertEquals("@OnStopped", testProcessor.operationNames.get(2));
}
/**
* Validates that Processor is eventually started once invocation of
*
* @OnSchedule stopped throwing exceptions.
*/
@Test
public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.noop(testProcessor);
testProcessor.generateExceptionOnScheduled = true;
testProcessor.keepFailingOnScheduledTimes = 2;
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
}
/**
* Validates that Processor can be stopped when @OnScheduled constantly
* fails. Basically validates that the re-try loop breaks if user initiated
* stopProcessor.
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.longRunningOnUnschedule(testProcessor, 100);
testProcessor.generateExceptionOnScheduled = true;
testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE;
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
}
/**
* Validates that the Processor can be stopped when @OnScheduled blocks
* indefinitely but written to react to thread interrupts
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingInterruptableOnUnschedule(testProcessor);
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
}
/**
* Validates that the Processor can be stopped when @OnScheduled blocks
* indefinitely and written to ignore thread interrupts
*/
@Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingUninterruptableOnUnschedule(testProcessor);
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE);
}
/**
* Validates that processor can be stopped if onTrigger() keeps throwing
* exceptions.
*/
@Test
public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception {
final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
flowManager = fcsb.getFlowManager();
ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.noop(testProcessor);
testProcessor.generateExceptionOnTrigger = true;
testProcNode.performValidation();
processScheduler.startProcessor(testProcNode, true);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
processScheduler.disableProcessor(testProcNode);
assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
processScheduler.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
}
/**
* Scenario where onTrigger() is executed with random delay limited to
* 'delayLimit', yet with guaranteed exit from onTrigger().
*/
private void randomOnTriggerDelay(TestProcessor testProcessor, int delayLimit) {
EmptyRunnable emptyRunnable = new EmptyRunnable();
RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, true);
testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, delayedRunnable);
}
/**
* Scenario where @OnSchedule is executed with delay limited to
* 'delayLimit'.
*/
private void longRunningOnSchedule(TestProcessor testProcessor, int delayLimit) {
EmptyRunnable emptyRunnable = new EmptyRunnable();
RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, false);
testProcessor.setScenario(delayedRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
}
/**
* Scenario where @OnUnschedule is executed with delay limited to
* 'delayLimit'.
*/
private void longRunningOnUnschedule(TestProcessor testProcessor, int delayLimit) {
EmptyRunnable emptyRunnable = new EmptyRunnable();
RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, false);
testProcessor.setScenario(emptyRunnable, delayedRunnable, emptyRunnable, emptyRunnable);
}
/**
* Scenario where @OnSchedule blocks indefinitely yet interruptible.
*/
private void blockingInterruptableOnUnschedule(TestProcessor testProcessor) {
EmptyRunnable emptyRunnable = new EmptyRunnable();
BlockingInterruptableRunnable blockingRunnable = new BlockingInterruptableRunnable();
testProcessor.setScenario(blockingRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
}
/**
* Scenario where @OnSchedule blocks indefinitely and un-interruptible.
*/
private void blockingUninterruptableOnUnschedule(TestProcessor testProcessor) {
EmptyRunnable emptyRunnable = new EmptyRunnable();
BlockingUninterruptableRunnable blockingRunnable = new BlockingUninterruptableRunnable();
testProcessor.setScenario(blockingRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
}
/**
* Scenario where all tasks are no op.
*/
private void noop(TestProcessor testProcessor) {
EmptyRunnable emptyRunnable = new EmptyRunnable();
testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable);
}
private FlowManagerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception {
final Map<String, String> addProps = new HashMap<>();
addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider");
addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName());
addProps.put("nifi.remote.input.socket.port", "");
addProps.put("nifi.remote.input.secure", "");
if (propKey != null && propValue != null) {
addProps.put(propKey, propValue);
}
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, addProps);
final Bundle systemBundle = SystemBundle.create(nifiProperties);
final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
mock(FlowRegistryClient.class), extensionManager, mock(StatusHistoryRepository.class));
final FlowManager flowManager = flowController.getFlowManager();
this.processScheduler = flowController.getProcessScheduler();
return new FlowManagerAndSystemBundle(flowManager, systemBundle);
}
private FlowManagerAndSystemBundle buildFlowControllerForTest() throws Exception {
return buildFlowControllerForTest(null, null);
}
private static class FlowManagerAndSystemBundle {
private final FlowManager flowManager;
private final Bundle systemBundle;
public FlowManagerAndSystemBundle(FlowManager flowManager, Bundle systemBundle) {
this.flowManager = flowManager;
this.systemBundle = systemBundle;
}
public FlowManager getFlowManager() {
return flowManager;
}
public Bundle getSystemBundle() {
return systemBundle;
}
}
/**
*
*/
public static class TestProcessor extends AbstractProcessor {
private static final Runnable NOP = () -> {
};
private Runnable onScheduleCallback = NOP;
private Runnable onUnscheduleCallback = NOP;
private Runnable onStopCallback = NOP;
private Runnable onTriggerCallback = NOP;
private boolean generateExceptionOnScheduled;
private boolean generateExceptionOnTrigger;
private boolean withService;
private int keepFailingOnScheduledTimes;
private int onScheduledExceptionCount;
private final List<String> operationNames = new LinkedList<>();
void setScenario(Runnable onScheduleCallback, Runnable onUnscheduleCallback, Runnable onStopCallback, Runnable onTriggerCallback) {
this.onScheduleCallback = onScheduleCallback;
this.onUnscheduleCallback = onUnscheduleCallback;
this.onStopCallback = onStopCallback;
this.onTriggerCallback = onTriggerCallback;
}
@OnScheduled
public void schedule(ProcessContext ctx) {
this.operationNames.add("@OnScheduled");
if (this.generateExceptionOnScheduled
&& this.onScheduledExceptionCount++ < this.keepFailingOnScheduledTimes) {
throw new RuntimeException("Intentional");
}
this.onScheduleCallback.run();
}
@OnUnscheduled
public void unschedule() {
this.operationNames.add("@OnUnscheduled");
this.onUnscheduleCallback.run();
}
@OnStopped
public void stop() {
this.operationNames.add("@OnStopped");
this.onStopCallback.run();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
PropertyDescriptor PROP = new PropertyDescriptor.Builder()
.name("P")
.description("Blah Blah")
.required(true)
.addValidator(new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build();
}
})
.build();
PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
.name("S")
.description("Blah Blah")
.required(true)
.identifiesControllerService(ITestservice.class)
.build();
return this.withService ? Arrays.asList(PROP, SERVICE) : Arrays.asList(PROP);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
if (this.generateExceptionOnTrigger) {
throw new RuntimeException("Intentional");
}
this.onTriggerCallback.run();
}
}
public static class TestService extends AbstractControllerService implements ITestservice {
}
public static interface ITestservice extends ControllerService {
}
private static class EmptyRunnable implements Runnable {
@Override
public void run() {
}
}
private static class BlockingInterruptableRunnable implements Runnable {
@Override
public void run() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static class BlockingUninterruptableRunnable implements Runnable {
@Override
public void run() {
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// ignore
}
}
}
}
private static class RandomOrFixedDelayedRunnable implements Runnable {
private final int delayLimit;
private final boolean randomDelay;
public RandomOrFixedDelayedRunnable(int delayLimit, boolean randomDelay) {
this.delayLimit = delayLimit;
this.randomDelay = randomDelay;
}
Random random = new Random();
@Override
public void run() {
try {
if (this.randomDelay) {
Thread.sleep(random.nextInt(this.delayLimit));
} else {
Thread.sleep(this.delayLimit);
}
} catch (InterruptedException e) {
logger.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
}
}
}
}