blob: dba192c8e4f2056367f01493644c2db902fb6132 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven()
@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
@CapabilityDescription("The DebugFlow processor aids testing and debugging the FlowFile framework by allowing various "
+ "responses to be explicitly triggered in response to the receipt of a FlowFile or a timer event without a "
+ "FlowFile if using timer or cron based scheduling. It can force responses needed to exercise or test "
+ "various failure modes that can occur when a processor runs.")
public class DebugFlow extends AbstractProcessor {
private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles processed successfully.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to process.")
.build();
private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
static final PropertyDescriptor FF_SUCCESS_ITERATIONS = new PropertyDescriptor.Builder()
.name("FlowFile Success Iterations")
.description("Number of FlowFiles to forward to success relationship.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor FF_FAILURE_ITERATIONS = new PropertyDescriptor.Builder()
.name("FlowFile Failure Iterations")
.description("Number of FlowFiles to forward to failure relationship.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor FF_ROLLBACK_ITERATIONS = new PropertyDescriptor.Builder()
.name("FlowFile Rollback Iterations")
.description("Number of FlowFiles to roll back (without penalty).")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor FF_ROLLBACK_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
.name("FlowFile Rollback Yield Iterations")
.description("Number of FlowFiles to roll back and yield.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor FF_ROLLBACK_PENALTY_ITERATIONS = new PropertyDescriptor.Builder()
.name("FlowFile Rollback Penalty Iterations")
.description("Number of FlowFiles to roll back with penalty.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
.name("FlowFile Exception Iterations")
.description("Number of FlowFiles to throw exception.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
.name("FlowFile Exception Class")
.description("Exception class to be thrown (must extend java.lang.RuntimeException).")
.required(true)
.defaultValue("java.lang.RuntimeException")
.addValidator(new RuntimeExceptionValidator())
.build();
static final PropertyDescriptor NO_FF_SKIP_ITERATIONS = new PropertyDescriptor.Builder()
.name("No FlowFile Skip Iterations")
.description("Number of times to skip onTrigger if no FlowFile.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor NO_FF_EXCEPTION_ITERATIONS = new PropertyDescriptor.Builder()
.name("No FlowFile Exception Iterations")
.description("Number of times to throw NPE exception if no FlowFile.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor NO_FF_YIELD_ITERATIONS = new PropertyDescriptor.Builder()
.name("No FlowFile Yield Iterations")
.description("Number of times to yield if no FlowFile.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor NO_FF_EXCEPTION_CLASS = new PropertyDescriptor.Builder()
.name("No FlowFile Exception Class")
.description("Exception class to be thrown if no FlowFile (must extend java.lang.RuntimeException).")
.required(true)
.defaultValue("java.lang.RuntimeException")
.addValidator(new RuntimeExceptionValidator())
.build();
static final PropertyDescriptor WRITE_ITERATIONS = new PropertyDescriptor.Builder()
.name("Write Iterations")
.description("Number of times to write to the FlowFile")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.required(true)
.defaultValue("0")
.build();
static final PropertyDescriptor CONTENT_SIZE = new PropertyDescriptor.Builder()
.name("Content Size")
.description("The number of bytes to write each time that the FlowFile is written to")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.required(true)
.defaultValue("1 KB")
.build();
static final PropertyDescriptor ON_SCHEDULED_SLEEP_TIME = new PropertyDescriptor.Builder()
.name("@OnScheduled Pause Time")
.description("Specifies how long the processor should sleep in the @OnScheduled method, so that the processor can be forced to take a long time to start up")
.required(true)
.defaultValue("0 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor ON_SCHEDULED_FAIL = new PropertyDescriptor.Builder()
.name("Fail When @OnScheduled called")
.description("Specifies whether or not the Processor should throw an Exception when the methods annotated with @OnScheduled are called")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor ON_UNSCHEDULED_SLEEP_TIME = new PropertyDescriptor.Builder()
.name("@OnUnscheduled Pause Time")
.description("Specifies how long the processor should sleep in the @OnUnscheduled method, so that the processor can be forced to take a long time to respond when user clicks stop")
.required(true)
.defaultValue("0 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor ON_UNSCHEDULED_FAIL = new PropertyDescriptor.Builder()
.name("Fail When @OnUnscheduled called")
.description("Specifies whether or not the Processor should throw an Exception when the methods annotated with @OnUnscheduled are called")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor ON_STOPPED_SLEEP_TIME = new PropertyDescriptor.Builder()
.name("@OnStopped Pause Time")
.description("Specifies how long the processor should sleep in the @OnStopped method, so that the processor can be forced to take a long time to shutdown")
.required(true)
.defaultValue("0 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor ON_STOPPED_FAIL = new PropertyDescriptor.Builder()
.name("Fail When @OnStopped called")
.description("Specifies whether or not the Processor should throw an Exception when the methods annotated with @OnStopped are called")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor ON_TRIGGER_SLEEP_TIME = new PropertyDescriptor.Builder()
.name("OnTrigger Pause Time")
.description("Specifies how long the processor should sleep in the onTrigger() method, so that the processor can be forced to take a long time to perform its task")
.required(true)
.defaultValue("0 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor CUSTOM_VALIDATE_SLEEP_TIME = new PropertyDescriptor.Builder()
.name("CustomValidate Pause Time")
.displayName("CustomValidate Pause Time")
.description("Specifies how long the processor should sleep in the customValidate() method")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.required(true)
.build();
static final PropertyDescriptor IGNORE_INTERRUPTS = new PropertyDescriptor.Builder()
.name("Ignore Interrupts When Paused")
.description("If the Processor's thread(s) are sleeping (due to one of the \"Pause Time\" properties above), and the thread is interrupted, "
+ "this indicates whether the Processor should ignore the interrupt and continue sleeping or if it should allow itself to be interrupted.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private volatile Integer flowFileMaxSuccess = 0;
private volatile Integer flowFileMaxFailure = 0;
private volatile Integer flowFileMaxRollback = 0;
private volatile Integer flowFileMaxYield = 0;
private volatile Integer flowFileMaxPenalty = 0;
private volatile Integer flowFileMaxException = 0;
private volatile Integer noFlowFileMaxSkip = 0;
private volatile Integer noFlowFileMaxException = 0;
private volatile Integer noFlowFileMaxYield = 0;
private volatile Integer flowFileCurrSuccess = 0;
private volatile Integer flowFileCurrFailure = 0;
private volatile Integer flowFileCurrRollback = 0;
private volatile Integer flowFileCurrYield = 0;
private volatile Integer flowFileCurrPenalty = 0;
private volatile Integer flowFileCurrException = 0;
private volatile Integer noFlowFileCurrSkip = 0;
private volatile Integer noFlowFileCurrException = 0;
private volatile Integer noFlowFileCurrYield = 0;
private volatile Class<? extends RuntimeException> flowFileExceptionClass = null;
private volatile Class<? extends RuntimeException> noFlowFileExceptionClass= null;
private final FlowFileResponse curr_ff_resp = new FlowFileResponse();
private final NoFlowFileResponse curr_noff_resp = new NoFlowFileResponse();
@Override
public Set<Relationship> getRelationships() {
synchronized (relationships) {
if (relationships.get() == null) {
HashSet<Relationship> relSet = new HashSet<>();
relSet.add(REL_SUCCESS);
relSet.add(REL_FAILURE);
relationships.compareAndSet(null, Collections.unmodifiableSet(relSet));
}
return relationships.get();
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (propertyDescriptors) {
if (propertyDescriptors.get() == null) {
ArrayList<PropertyDescriptor> propList = new ArrayList<>();
propList.add(FF_SUCCESS_ITERATIONS);
propList.add(FF_FAILURE_ITERATIONS);
propList.add(FF_ROLLBACK_ITERATIONS);
propList.add(FF_ROLLBACK_YIELD_ITERATIONS);
propList.add(FF_ROLLBACK_PENALTY_ITERATIONS);
propList.add(FF_EXCEPTION_ITERATIONS);
propList.add(FF_EXCEPTION_CLASS);
propList.add(NO_FF_SKIP_ITERATIONS);
propList.add(NO_FF_EXCEPTION_ITERATIONS);
propList.add(NO_FF_YIELD_ITERATIONS);
propList.add(NO_FF_EXCEPTION_CLASS);
propList.add(WRITE_ITERATIONS);
propList.add(CONTENT_SIZE);
propList.add(ON_SCHEDULED_SLEEP_TIME);
propList.add(ON_SCHEDULED_FAIL);
propList.add(ON_UNSCHEDULED_SLEEP_TIME);
propList.add(ON_UNSCHEDULED_FAIL);
propList.add(ON_STOPPED_SLEEP_TIME);
propList.add(ON_STOPPED_FAIL);
propList.add(ON_TRIGGER_SLEEP_TIME);
propList.add(CUSTOM_VALIDATE_SLEEP_TIME);
propList.add(IGNORE_INTERRUPTS);
propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList));
}
return propertyDescriptors.get();
}
}
@OnScheduled
@SuppressWarnings("unchecked")
public void onScheduled(ProcessContext context) throws ClassNotFoundException, InterruptedException {
flowFileMaxSuccess = context.getProperty(FF_SUCCESS_ITERATIONS).asInteger();
flowFileMaxFailure = context.getProperty(FF_FAILURE_ITERATIONS).asInteger();
flowFileMaxYield = context.getProperty(FF_ROLLBACK_YIELD_ITERATIONS).asInteger();
flowFileMaxRollback = context.getProperty(FF_ROLLBACK_ITERATIONS).asInteger();
flowFileMaxPenalty = context.getProperty(FF_ROLLBACK_PENALTY_ITERATIONS).asInteger();
flowFileMaxException = context.getProperty(FF_EXCEPTION_ITERATIONS).asInteger();
noFlowFileMaxException = context.getProperty(NO_FF_EXCEPTION_ITERATIONS).asInteger();
noFlowFileMaxYield = context.getProperty(NO_FF_YIELD_ITERATIONS).asInteger();
noFlowFileMaxSkip = context.getProperty(NO_FF_SKIP_ITERATIONS).asInteger();
curr_ff_resp.reset();
curr_noff_resp.reset();
flowFileExceptionClass = (Class<? extends RuntimeException>) Class.forName(context.getProperty(FF_EXCEPTION_CLASS).toString());
noFlowFileExceptionClass = (Class<? extends RuntimeException>) Class.forName(context.getProperty(NO_FF_EXCEPTION_CLASS).toString());
sleep(context.getProperty(ON_SCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
fail(context.getProperty(ON_SCHEDULED_FAIL).asBoolean(), OnScheduled.class);
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) throws InterruptedException {
sleep(context.getProperty(ON_UNSCHEDULED_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
fail(context.getProperty(ON_UNSCHEDULED_FAIL).asBoolean(), OnUnscheduled.class);
}
@OnStopped
public void onStopped(final ProcessContext context) throws InterruptedException {
sleep(context.getProperty(ON_STOPPED_SLEEP_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS),
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
fail(context.getProperty(ON_STOPPED_FAIL).asBoolean(), OnStopped.class);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
try {
sleep(validationContext.getProperty(CUSTOM_VALIDATE_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS),
validationContext.getProperty(IGNORE_INTERRUPTS).asBoolean());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Collections.singleton(new ValidationResult.Builder()
.valid(false)
.subject("Validation")
.explanation("Processor Interrupted while performing validation").build());
}
return super.customValidate(validationContext);
}
private void sleep(final long millis, final boolean ignoreInterrupts) throws InterruptedException {
if (millis > 0L) {
final long endSleep = System.currentTimeMillis() + millis;
while (System.currentTimeMillis() < endSleep) {
// Use Math.max(1, <sleep duration>) here in case
// System.currentTimeMillis() has changed since the check above
// and subtracting it from endSleep would now result in a value <= 0.
try {
Thread.sleep(Math.max(1L, endSleep - System.currentTimeMillis()));
} catch (final InterruptedException ie) {
if (!ignoreInterrupts) {
Thread.currentThread().interrupt();
throw ie;
}
}
}
}
}
private void fail(final boolean isAppropriate, final Class<?> annotationClass) {
if (isAppropriate) {
throw new RuntimeException("Failure configured for " + annotationClass.getSimpleName() + " methods");
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile ff = session.get();
try {
// Make up to 2 passes to allow rollover from last cycle to first.
// (This could be "while(true)" since responses should break out if selected, but this
// prevents endless loops in the event of unexpected errors or future changes.)
for (int pass = 2; pass > 0; pass--) {
if (ff == null) {
if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_SKIP_RESPONSE) {
if (noFlowFileCurrSkip < noFlowFileMaxSkip) {
noFlowFileCurrSkip += 1;
logger.info("DebugFlow skipping with no flow file");
return;
} else {
noFlowFileCurrSkip = 0;
curr_noff_resp.getNextCycle();
}
}
if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_EXCEPTION_RESPONSE) {
if (noFlowFileCurrException < noFlowFileMaxException) {
noFlowFileCurrException += 1;
logger.info("DebugFlow throwing NPE with no flow file");
String message = "forced by " + this.getClass().getName();
RuntimeException rte;
try {
rte = noFlowFileExceptionClass.getConstructor(String.class).newInstance(message);
throw rte;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
if (logger.isErrorEnabled()) {
logger.error("{} unexpected exception throwing DebugFlow exception: {}",
new Object[] {this, e});
}
}
} else {
noFlowFileCurrException = 0;
curr_noff_resp.getNextCycle();
}
}
if (curr_noff_resp.state() == NoFlowFileResponseState.NO_FF_YIELD_RESPONSE) {
if (noFlowFileCurrYield < noFlowFileMaxYield) {
noFlowFileCurrYield += 1;
logger.info("DebugFlow yielding with no flow file");
context.yield();
break;
} else {
noFlowFileCurrYield = 0;
curr_noff_resp.getNextCycle();
}
}
return;
} else {
final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
if (writeIterations > 0 && pass == 1) {
final Random random = new Random();
for (int i = 0; i < writeIterations; i++) {
final byte[] data = new byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()];
random.nextBytes(data);
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(data);
}
});
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) {
if (flowFileCurrSuccess < flowFileMaxSuccess) {
flowFileCurrSuccess += 1;
logger.info("DebugFlow transferring to success file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
session.transfer(ff, REL_SUCCESS);
break;
} else {
flowFileCurrSuccess = 0;
curr_ff_resp.getNextCycle();
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_FAILURE_RESPONSE) {
if (flowFileCurrFailure < flowFileMaxFailure) {
flowFileCurrFailure += 1;
logger.info("DebugFlow transferring to failure file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
session.transfer(ff, REL_FAILURE);
break;
} else {
flowFileCurrFailure = 0;
curr_ff_resp.getNextCycle();
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_ROLLBACK_RESPONSE) {
if (flowFileCurrRollback < flowFileMaxRollback) {
flowFileCurrRollback += 1;
logger.info("DebugFlow rolling back (no penalty) file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
session.rollback();
break;
} else {
flowFileCurrRollback = 0;
curr_ff_resp.getNextCycle();
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_YIELD_RESPONSE) {
if (flowFileCurrYield < flowFileMaxYield) {
flowFileCurrYield += 1;
logger.info("DebugFlow yielding file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
session.rollback();
context.yield();
return;
} else {
flowFileCurrYield = 0;
curr_ff_resp.getNextCycle();
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_PENALTY_RESPONSE) {
if (flowFileCurrPenalty < flowFileMaxPenalty) {
flowFileCurrPenalty += 1;
logger.info("DebugFlow rolling back (with penalty) file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
session.rollback(true);
break;
} else {
flowFileCurrPenalty = 0;
curr_ff_resp.getNextCycle();
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_EXCEPTION_RESPONSE) {
if (flowFileCurrException < flowFileMaxException) {
flowFileCurrException += 1;
String message = "forced by " + this.getClass().getName();
logger.info("DebugFlow throwing NPE file={} UUID={}",
new Object[] {ff.getAttribute(CoreAttributes.FILENAME.key()),
ff.getAttribute(CoreAttributes.UUID.key())});
RuntimeException rte;
try {
rte = flowFileExceptionClass.getConstructor(String.class).newInstance(message);
throw rte;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
if (logger.isErrorEnabled()) {
logger.error("{} unexpected exception throwing DebugFlow exception: {}",
new Object[] {this, e});
}
}
} else {
flowFileCurrException = 0;
curr_ff_resp.getNextCycle();
}
}
}
}
} finally {
final long sleepMillis = context.getProperty(ON_TRIGGER_SLEEP_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
try {
if (sleepMillis > 0) {
sleep(sleepMillis, context.getProperty(IGNORE_INTERRUPTS).asBoolean());
getLogger().info("DebugFlow finishes sleeping at completion of its onTrigger() method");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static class RuntimeExceptionValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult.Builder resultBuilder = new ValidationResult.Builder()
.subject(subject)
.input(input);
try {
final Class<?> exceptionClass = Class.forName(input);
if (RuntimeException.class.isAssignableFrom(exceptionClass)) {
resultBuilder.valid(true);
} else {
resultBuilder.valid(false).explanation("Class " + input + " is a Checked Exception, not a RuntimeException");
}
} catch (ClassNotFoundException e) {
resultBuilder.valid(false).explanation("Class " + input + " cannot be found");
}
return resultBuilder.build();
}
}
private enum FlowFileResponseState {
FF_SUCCESS_RESPONSE,
FF_FAILURE_RESPONSE,
FF_ROLLBACK_RESPONSE,
FF_YIELD_RESPONSE,
FF_PENALTY_RESPONSE,
FF_EXCEPTION_RESPONSE;
private FlowFileResponseState nextState;
static {
FF_SUCCESS_RESPONSE.nextState = FF_FAILURE_RESPONSE;
FF_FAILURE_RESPONSE.nextState = FF_ROLLBACK_RESPONSE;
FF_ROLLBACK_RESPONSE.nextState = FF_YIELD_RESPONSE;
FF_YIELD_RESPONSE.nextState = FF_PENALTY_RESPONSE;
FF_PENALTY_RESPONSE.nextState = FF_EXCEPTION_RESPONSE;
FF_EXCEPTION_RESPONSE.nextState = FF_SUCCESS_RESPONSE;
}
FlowFileResponseState next() {
return nextState;
}
}
private class FlowFileResponse {
private final AtomicReference<FlowFileResponseState> current = new AtomicReference<>();
FlowFileResponse() {
current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE);
}
synchronized FlowFileResponseState state() {
return current.get();
}
synchronized void getNextCycle() {
current.set(current.get().next());
}
synchronized void reset() {
current.set(FlowFileResponseState.FF_SUCCESS_RESPONSE);
}
}
private enum NoFlowFileResponseState {
NO_FF_SKIP_RESPONSE,
NO_FF_EXCEPTION_RESPONSE,
NO_FF_YIELD_RESPONSE;
private NoFlowFileResponseState nextState;
static {
NO_FF_SKIP_RESPONSE.nextState = NO_FF_EXCEPTION_RESPONSE;
NO_FF_EXCEPTION_RESPONSE.nextState = NO_FF_YIELD_RESPONSE;
NO_FF_YIELD_RESPONSE.nextState = NO_FF_SKIP_RESPONSE;
}
NoFlowFileResponseState next() {
return nextState;
}
}
private class NoFlowFileResponse {
private final AtomicReference<NoFlowFileResponseState> current = new AtomicReference<>();
NoFlowFileResponse() {
current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE);
}
synchronized NoFlowFileResponseState state() {
return current.get();
}
synchronized void getNextCycle() {
current.set(current.get().next());
}
synchronized void reset() {
current.set(NoFlowFileResponseState.NO_FF_SKIP_RESPONSE);
}
}
}