blob: 2d4d7681db3a7818f68f204c75af4f8aa4d9197b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.pattern;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
/**
* <p>RollbackOnFailure can be used as a function context for process patterns such as {@link Put} to provide a configurable error handling.
*
* <p>
* RollbackOnFailure can add following characteristics to a processor:
* <li>When disabled, input FlowFiles caused an error will be routed to 'failure' or 'retry' relationship, based on the type of error.</li>
* <li>When enabled, input FlowFiles are kept in the input queue. A ProcessException is thrown to rollback the process session.</li>
* <li>It assumes anything happened during a processors onTrigger can rollback, if this is marked as transactional.</li>
* <li>If transactional and enabled, even if some FlowFiles are already processed, it rollbacks the session when error occurs.</li>
* <li>If not transactional and enabled, it only rollbacks the session when error occurs only if there was no progress.</li>
* </p>
*
* <p>There are two approaches to apply RollbackOnFailure. One is using {@link ExceptionHandler#adjustError(BiFunction)},
* and the other is implementing processor onTrigger using process patterns such as {@link Put#adjustRoute(AdjustRoute)}. </p>
*
* <p>It's also possible to use both approaches. ExceptionHandler can apply when an Exception is thrown immediately, while AdjustRoute respond later but requires less code.</p>
*/
public class RollbackOnFailure {
private final boolean rollbackOnFailure;
private final boolean transactional;
private boolean discontinue;
private int processedCount = 0;
/**
* Constructor.
* @param rollbackOnFailure Should be set by user via processor configuration.
* @param transactional Specify whether a processor is transactional.
* If not, it is important to call {@link #proceed()} after successful execution of processors task,
* that indicates processor made an operation that can not be undone.
*/
public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) {
this.rollbackOnFailure = rollbackOnFailure;
this.transactional = transactional;
}
public static final PropertyDescriptor ROLLBACK_ON_FAILURE = createRollbackOnFailureProperty("");
public static PropertyDescriptor createRollbackOnFailureProperty(String additionalDescription) {
return new PropertyDescriptor.Builder()
.name("rollback-on-failure")
.displayName("Rollback On Failure")
.description("Specify how to handle error." +
" By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to" +
" 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile." +
" Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately." +
" In that case, you can do so by enabling this 'Rollback On Failure' property. " +
" If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly" +
" until it gets processed successfully or removed by other means." +
" It is important to set adequate 'Yield Duration' to avoid retrying too frequently." + additionalDescription)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.required(true)
.build();
}
/**
* Create a function to use with {@link ExceptionHandler} that adjust error type based on functional context.
*/
public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorTypes.Result> createAdjustError(final ComponentLog logger) {
return (c, t) -> {
ErrorTypes.Result adjusted = null;
switch (t.destination()) {
case ProcessException:
// If this process can rollback, then rollback it.
if (!c.canRollback()) {
// If an exception is thrown but the processor is not transactional and processed count > 0, adjust it to self,
// in order to stop any further processing until this input is processed successfully.
// If we throw an Exception in this state, the already succeeded FlowFiles will be rolled back, too.
// In case the progress was made by other preceding inputs,
// those successful inputs should be sent to 'success' and this input stays in incoming queue.
// In case this input made some progress to external system, the partial update will be replayed again,
// can cause duplicated data.
c.discontinue();
// We should not penalize a FlowFile, if we did, other FlowFiles can be fetched first.
// We need to block others to be processed until this one finishes.
adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
}
break;
case Failure:
case Retry:
if (c.isRollbackOnFailure()) {
c.discontinue();
if (c.canRollback()) {
// If this process can rollback, then throw ProcessException instead, in order to rollback.
adjusted = new ErrorTypes.Result(ErrorTypes.Destination.ProcessException, ErrorTypes.Penalty.Yield);
} else {
// If not,
adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
}
}
break;
}
if (adjusted != null) {
if (logger.isDebugEnabled()) {
logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}",
new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()});
}
return adjusted;
}
return t.result();
};
}
/**
* Create an {@link AdjustRoute} function to use with process pattern such as {@link Put} that adjust routed FlowFiles based on context.
* This function works as a safety net by covering cases that Processor implementation did not use ExceptionHandler and transfer FlowFiles
* without considering RollbackOnFailure context.
*/
public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> createAdjustRoute(Relationship ... failureRelationships) {
return (context, session, fc, result) -> {
if (fc.isRollbackOnFailure()) {
// Check if route contains failure relationship.
for (Relationship failureRelationship : failureRelationships) {
if (!result.contains(failureRelationship)) {
continue;
}
if (fc.canRollback()) {
throw new ProcessException(String.format(
"A FlowFile is routed to %s. Rollback session based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s",
failureRelationship.getName(), fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional()));
} else {
// Send failed FlowFiles to self.
final Map<Relationship, List<FlowFile>> routedFlowFiles = result.getRoutedFlowFiles();
final List<FlowFile> failedFlowFiles = routedFlowFiles.remove(failureRelationship);
result.routeTo(failedFlowFiles, Relationship.SELF);
}
}
}
};
}
public static <FCT extends RollbackOnFailure, I> ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> onError) {
return onError.andThen((context, input, result, e) -> {
if (context.shouldDiscontinue()) {
throw new DiscontinuedException("Discontinue processing due to " + e, e);
}
});
}
public static <FCT extends RollbackOnFailure> void onTrigger(
ProcessContext context, ProcessSessionFactory sessionFactory, FCT functionContext, ComponentLog logger,
PartialFunctions.OnTrigger onTrigger) throws ProcessException {
PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> {
// If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback,
// in order to keep those in the incoming relationship to be processed again.
final boolean shouldPenalize = !functionContext.isRollbackOnFailure();
session.rollback(shouldPenalize);
// However, keeping failed FlowFile in the incoming relationship would retry it too often.
// So, administratively yield the process.
if (functionContext.isRollbackOnFailure()) {
logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t);
context.yield();
}
});
}
public int proceed() {
return ++processedCount;
}
public int getProcessedCount() {
return processedCount;
}
public boolean isRollbackOnFailure() {
return rollbackOnFailure;
}
public boolean isTransactional() {
return transactional;
}
public boolean canRollback() {
return transactional || processedCount == 0;
}
public boolean shouldDiscontinue() {
return discontinue;
}
public void discontinue() {
this.discontinue = true;
}
}