| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.standard; |
| |
| import org.apache.nifi.annotation.behavior.DynamicProperty; |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.ReadsAttribute; |
| import org.apache.nifi.annotation.behavior.SideEffectFree; |
| import org.apache.nifi.annotation.behavior.SupportsBatching; |
| import org.apache.nifi.annotation.behavior.WritesAttribute; |
| import org.apache.nifi.annotation.behavior.WritesAttributes; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.logging.LogLevel; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.ProcessorInitializationContext; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.util.StringUtils; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| @Tags({"Retry", "FlowFile"}) |
| @CapabilityDescription("FlowFiles passed to this Processor have a 'Retry Attribute' value checked against a " + |
| "configured 'Maximum Retries' value. If the current attribute value is below the configured maximum, the " + |
| "FlowFile is passed to a retry relationship. The FlowFile may or may not be penalized in that condition. " + |
| "If the FlowFile's attribute value exceeds the configured maximum, the FlowFile will be passed to a " + |
| "'retries_exceeded' relationship. WARNING: If the incoming FlowFile has a non-numeric value in the " + |
| "configured 'Retry Attribute' attribute, it will be reset to '1'. You may choose to fail the FlowFile " + |
| "instead of performing the reset. Additional dynamic properties can be defined for any attributes you " + |
| "wish to add to the FlowFiles transferred to 'retries_exceeded'. These attributes support attribute " + |
| "expression language.") |
| @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) |
| @SupportsBatching |
| @SideEffectFree |
| @ReadsAttribute(attribute = "Retry Attribute", |
| description = "Will read the attribute or attribute expression language result as defined in 'Retry Attribute'") |
| @WritesAttributes({ |
| @WritesAttribute(attribute = "Retry Attribute", |
| description = "User defined retry attribute is updated with the current retry count"), |
| @WritesAttribute(attribute = "Retry Attribute .uuid", |
| description = "User defined retry attribute with .uuid that determines what processor " + |
| "retried the FlowFile last") |
| }) |
| @DynamicProperty(name = "Exceeded FlowFile Attribute Key", |
| value = "The value of the attribute added to the FlowFile", |
| description = "One or more dynamic properties can be used to add attributes to FlowFiles passed to " + |
| "the 'retries_exceeded' relationship", |
| expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| public class RetryFlowFile extends AbstractProcessor { |
| private List<PropertyDescriptor> properties; |
| private Set<Relationship> relationships; |
| private String retryAttribute; |
| private Boolean penalizeRetried; |
| private Boolean failOnOverwrite; |
| private String reuseMode; |
| private String lastRetriedBy; |
| |
| public static final PropertyDescriptor RETRY_ATTRIBUTE = new PropertyDescriptor.Builder() |
| .name("retry-attribute") |
| .displayName("Retry Attribute") |
| .description("The name of the attribute that contains the current retry count for the FlowFile. " + |
| "WARNING: If the name matches an attribute already on the FlowFile that does not contain a " + |
| "numerical value, the processor will either overwrite that attribute with '1' or fail " + |
| "based on configuration.") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .defaultValue("flowfile.retries") |
| .build(); |
| public static final PropertyDescriptor MAXIMUM_RETRIES = new PropertyDescriptor.Builder() |
| .name("maximum-retries") |
| .displayName("Maximum Retries") |
| .description("The maximum number of times a FlowFile can be retried before being " + |
| "passed to the 'retries_exceeded' relationship") |
| .required(true) |
| .addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true)) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .defaultValue("3") |
| .build(); |
| public static final PropertyDescriptor PENALIZE_RETRIED = new PropertyDescriptor.Builder() |
| .name("penalize-retries") |
| .displayName("Penalize Retries") |
| .description("If set to 'true', this Processor will penalize input FlowFiles before passing them " + |
| "to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.") |
| .required(true) |
| .allowableValues("true", "false") |
| .defaultValue("true") |
| .build(); |
| public static final PropertyDescriptor FAIL_ON_OVERWRITE = new PropertyDescriptor.Builder() |
| .name("Fail on Non-numerical Overwrite") |
| .description("If the FlowFile already has the attribute defined in 'Retry Attribute' that is " + |
| "*not* a number, fail the FlowFile instead of resetting that value to '1'") |
| .required(true) |
| .allowableValues("true", "false") |
| .defaultValue("false") |
| .build(); |
| |
| public static final AllowableValue FAIL_ON_REUSE = new AllowableValue( |
| "fail", |
| "Fail on Reuse", |
| "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, fail the FlowFile" + |
| " regardless of current retry count" |
| ); |
| public static final AllowableValue WARN_ON_REUSE = new AllowableValue( |
| "warn", |
| "Warn on Reuse", |
| "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a warning " + |
| "message before resetting the retry attribute and UUID for this instance" |
| ); |
| public static final AllowableValue RESET_ON_REUSE = new AllowableValue( |
| "reset", |
| "Reset Reuse", |
| "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a debug " + |
| "message before resetting the retry attribute and UUID for this instance" |
| ); |
| public static final PropertyDescriptor REUSE_MODE = new PropertyDescriptor.Builder() |
| .name("reuse-mode") |
| .displayName("Reuse Mode") |
| .description("Defines how the Processor behaves if the retry FlowFile has a different retry UUID than " + |
| "the instance that received the FlowFile. This generally means that the attribute was not reset " + |
| "after being successfully retried by a previous instance of this processor.") |
| .required(true) |
| .allowableValues(FAIL_ON_REUSE, WARN_ON_REUSE, RESET_ON_REUSE) |
| .defaultValue(FAIL_ON_REUSE.getValue()) |
| .build(); |
| |
| public static final Relationship RETRY = new Relationship.Builder() |
| .name("retry") |
| .description("Input FlowFile has not exceeded the configured maximum retry count, pass this " + |
| "relationship back to the input Processor to create a limited feedback loop.") |
| .build(); |
| public static final Relationship RETRIES_EXCEEDED = new Relationship.Builder() |
| .name("retries_exceeded") |
| .description("Input FlowFile has exceeded the configured maximum retry count, do not pass this " + |
| "relationship back to the input Processor to terminate the limited feedback loop.") |
| .build(); |
| public static final Relationship FAILURE = new Relationship.Builder() |
| .name("failure") |
| .description("The processor is configured such that a non-numerical value on 'Retry Attribute' " + |
| "results in a failure instead of resetting that value to '1'. This will immediately " + |
| "terminate the limited feedback loop. Might also include when 'Maximum Retries' contains " + |
| "attribute expression language that does not resolve to an Integer.") |
| .autoTerminateDefault(true) |
| .build(); |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| return relationships; |
| } |
| |
| @Override |
| protected void init(ProcessorInitializationContext context) { |
| List<PropertyDescriptor> props = new ArrayList<>(); |
| props.add(RETRY_ATTRIBUTE); |
| props.add(MAXIMUM_RETRIES); |
| props.add(PENALIZE_RETRIED); |
| props.add(FAIL_ON_OVERWRITE); |
| props.add(REUSE_MODE); |
| this.properties = Collections.unmodifiableList(props); |
| |
| Set<Relationship> rels = new HashSet<>(); |
| rels.add(RETRY); |
| rels.add(RETRIES_EXCEEDED); |
| rels.add(FAILURE); |
| this.relationships = Collections.unmodifiableSet(rels); |
| } |
| |
| @Override |
| protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { |
| return new PropertyDescriptor.Builder() |
| .name(propertyDescriptorName) |
| .description("Attribute " + propertyDescriptorName + " will be placed on FlowFiles " + |
| "exceeding the retry count") |
| .required(false) |
| .dynamic(true) |
| .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return properties; |
| } |
| |
| @OnScheduled |
| @SuppressWarnings("unused") |
| public void onScheduled(final ProcessContext context) { |
| retryAttribute = context.getProperty(RETRY_ATTRIBUTE).evaluateAttributeExpressions().getValue(); |
| penalizeRetried = context.getProperty(PENALIZE_RETRIED).asBoolean(); |
| failOnOverwrite = context.getProperty(FAIL_ON_OVERWRITE).asBoolean(); |
| reuseMode = context.getProperty(REUSE_MODE).getValue(); |
| lastRetriedBy = retryAttribute.concat(".uuid"); |
| } |
| |
| @Override |
| public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { |
| FlowFile flowfile = session.get(); |
| if (null == flowfile) |
| return; |
| |
| String retryAttributeValue = flowfile.getAttribute(retryAttribute); |
| Integer currentRetry; |
| try { |
| currentRetry = (null == retryAttributeValue) |
| ? 1 |
| : Integer.valueOf(retryAttributeValue.trim()) + 1; |
| } catch (NumberFormatException ex) { |
| // Configured to fail if this was not a number |
| if (failOnOverwrite) { |
| session.transfer(flowfile, FAILURE); |
| return; |
| } |
| // reset to '1' if that wasn't the case |
| currentRetry = 1; |
| } |
| |
| String lastRetriedByUUID = flowfile.getAttribute(lastRetriedBy); |
| String currentInstanceUUID = getIdentifier(); |
| if (!StringUtils.isBlank(lastRetriedByUUID) && !currentInstanceUUID.equals(lastRetriedByUUID)) { |
| LogLevel reuseLogLevel = LogLevel.DEBUG; |
| switch (reuseMode) { |
| case "fail": |
| getLogger().error("FlowFile {} was previously retried with the same attribute by a " + |
| "different processor. Route to 'failure'", new Object[]{flowfile}); |
| getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}", |
| new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry - 1}); |
| session.transfer(flowfile, FAILURE); |
| return; |
| case "warn": |
| reuseLogLevel = LogLevel.WARN; |
| case "reset": |
| getLogger().log(reuseLogLevel, "FlowFile {} was previously retried with the same attribute " + |
| "by a different processor. Reset the current retry count to '1'. Consider " + |
| "changing the retry attribute for this processor.", |
| new Object[]{flowfile}); |
| getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}", |
| new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry}); |
| currentRetry = 1; |
| break; |
| } |
| } |
| |
| Integer maximumRetries; |
| try { |
| maximumRetries = context.getProperty(MAXIMUM_RETRIES) |
| .evaluateAttributeExpressions(flowfile) |
| .asInteger(); |
| |
| if (null == maximumRetries) { |
| getLogger().warn("Could not obtain maximum retries off of FlowFile, route to 'failure'"); |
| session.transfer(flowfile, FAILURE); |
| return; |
| } |
| } catch (NumberFormatException ex) { |
| getLogger().warn("Maximum Retries was not a number for this FlowFile, route to 'failure'"); |
| session.transfer(flowfile, FAILURE); |
| return; |
| } |
| |
| if (currentRetry > maximumRetries) { |
| // Add dynamic properties |
| for (PropertyDescriptor descriptor : context.getProperties().keySet()) { |
| if (!descriptor.isDynamic()) |
| continue; |
| |
| String value = context.getProperty(descriptor) |
| .evaluateAttributeExpressions(flowfile) |
| .getValue(); |
| if (!StringUtils.isBlank(value)) |
| flowfile = session.putAttribute(flowfile, descriptor.getName(), value); |
| } |
| |
| flowfile = session.removeAttribute(flowfile, retryAttribute); |
| flowfile = session.removeAttribute(flowfile, lastRetriedBy); |
| session.transfer(flowfile, RETRIES_EXCEEDED); |
| } else { |
| if (penalizeRetried) |
| session.penalize(flowfile); |
| |
| // Update and transfer |
| flowfile = session.putAttribute(flowfile, retryAttribute, String.valueOf(currentRetry)); |
| flowfile = session.putAttribute(flowfile, lastRetriedBy, getIdentifier()); |
| session.transfer(flowfile, RETRY); |
| } |
| } |
| } |