blob: c9f7dd5f001db4af3bccc6b56a3f55bbd97ca23a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RetryFlowFile.h"
#include "core/PropertyValidation.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property RetryFlowFile::RetryAttribute(core::PropertyBuilder::createProperty("Retry Attribute")
->withDescription(
"The name of the attribute that contains the current retry count for the FlowFile."
"WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, "
"the processor will either overwrite that attribute with '1' or fail based on configuration.")
->withDefaultValue("flowfile.retries", core::StandardValidators::NON_BLANK_VALIDATOR())
->supportsExpressionLanguage(true)
->isRequired(true)
->build());
core::Property RetryFlowFile::MaximumRetries(core::PropertyBuilder::createProperty("Maximum Retries")
->withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.")
->withDefaultValue<uint64_t>(3)
->supportsExpressionLanguage(true)
->isRequired(true)
->build());
core::Property RetryFlowFile::PenalizeRetries(core::PropertyBuilder::createProperty("Penalize Retries")
->withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
->withDefaultValue<bool>(true)
->isRequired(true)
->build());
core::Property RetryFlowFile::FailOnNonNumericalOverwrite(core::PropertyBuilder::createProperty("Fail on Non-numerical Overwrite")
->withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'")
->withDefaultValue<bool>(false)
->isRequired(true)
->build());
core::Property RetryFlowFile::ReuseMode(core::PropertyBuilder::createProperty("Reuse Mode")
->withDescription(
"Defines how the Processor behaves if the retry FlowFile has a different retry UUID than "
"the instance that received the FlowFile. This generally means that the attribute was "
"not reset after being successfully retried by a previous instance of this processor.")
->withAllowableValues<std::string>({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE})
->withDefaultValue(FAIL_ON_REUSE)
->isRequired(true)
->build());
core::Relationship RetryFlowFile::Retry("retry",
"Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop.");
core::Relationship RetryFlowFile::RetriesExceeded("retries_exceeded",
"Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop.");
core::Relationship RetryFlowFile::Failure("failure",
"The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting "
"that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains "
" attribute expression language that does not resolve to an Integer.");
void RetryFlowFile::initialize() {
setSupportedProperties({
RetryAttribute,
MaximumRetries,
PenalizeRetries,
FailOnNonNumericalOverwrite,
ReuseMode,
});
setSupportedRelationships({
Retry,
RetriesExceeded,
Failure,
});
}
void RetryFlowFile::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
context->getProperty(RetryAttribute.getName(), retry_attribute_);
context->getProperty(MaximumRetries.getName(), maximum_retries_);
context->getProperty(PenalizeRetries.getName(), penalize_retries_);
context->getProperty(FailOnNonNumericalOverwrite.getName(), fail_on_non_numerical_overwrite_);
context->getProperty(ReuseMode.getName(), reuse_mode_);
readDynamicPropertyKeys(context);
}
void RetryFlowFile::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord> (session->get());
if (!flow_file) {
return;
}
utils::optional<uint64_t> maybe_retry_property_value = getRetryPropertyValue(flow_file);
if (!maybe_retry_property_value) {
session->transfer(flow_file, Failure);
return;
}
uint64_t retry_property_value = maybe_retry_property_value.value();
const std::string last_retried_by_property_name = retry_attribute_ + ".uuid";
const std::string current_processor_uuid = getUUIDStr();
std::string last_retried_by_uuid;
if (flow_file->getAttribute(last_retried_by_property_name, last_retried_by_uuid)) {
if (last_retried_by_uuid != current_processor_uuid) {
if (reuse_mode_ == FAIL_ON_REUSE) {
logger_->log_error("FlowFile %s was previously retried with the same attribute by a different "
"processor (uuid: %s, current uuid: %s). Transfering flowfile to 'failure'...",
flow_file->getUUIDStr(), last_retried_by_uuid, current_processor_uuid);
session->transfer(flow_file, Failure);
return;
}
// Assuming reuse_mode_ == WARN_ON_REUSE || reuse_mode_ == RESET_REUSE
core::logging::LOG_LEVEL reuse_mode_log_level(core::logging::LOG_LEVEL::debug);
if (reuse_mode_ == WARN_ON_REUSE) {
reuse_mode_log_level = core::logging::LOG_LEVEL::warn;
}
logger_->log_string(reuse_mode_log_level, "Reusing retry attribute that belongs to different processor. Resetting value to 0.");
retry_property_value = 0;
}
}
if (retry_property_value < maximum_retries_) {
flow_file->setAttribute(retry_attribute_, std::to_string(retry_property_value + 1));
if (penalize_retries_) {
session->penalize(flow_file);
}
session->transfer(flow_file, Retry);
return;
}
setRetriesExceededAttributesOnFlowFile(context, flow_file);
session->transfer(flow_file, RetriesExceeded);
}
void RetryFlowFile::readDynamicPropertyKeys(core::ProcessContext* context) {
exceeded_flowfile_attribute_keys_.clear();
const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys();
logger_->log_info("RetryFlowFile registering %d keys", dynamic_prop_keys.size());
for (const auto& key : dynamic_prop_keys) {
exceeded_flowfile_attribute_keys_.emplace_back(core::PropertyBuilder::createProperty(key)->withDescription("auto generated")->supportsExpressionLanguage(true)->build());
logger_->log_info("RetryFlowFile registered attribute '%s'", key);
}
}
utils::optional<uint64_t> RetryFlowFile::getRetryPropertyValue(const std::shared_ptr<FlowFileRecord>& flow_file) const {
std::string value_as_string;
flow_file->getAttribute(retry_attribute_, value_as_string);
uint64_t value;
try {
utils::internal::ValueParser(value_as_string).parse(value).parseEnd();
return utils::make_optional<uint64_t>(value);
}
catch(const utils::internal::ParseException&) {
if (fail_on_non_numerical_overwrite_) {
logger_->log_info("Non-numerical retry property in RetryFlowFile (value: %s). Sending flowfile to failure...", value_as_string);
return {};
}
}
logger_->log_info("Non-numerical retry property in RetryFlowFile: overwriting %s with 0.", value_as_string);
return utils::make_optional<uint64_t>(0);
}
void RetryFlowFile::setRetriesExceededAttributesOnFlowFile(core::ProcessContext* context, const std::shared_ptr<FlowFileRecord>& flow_file) const {
for (const auto& attribute : exceeded_flowfile_attribute_keys_) {
std::string value;
context->getDynamicProperty(attribute, value, flow_file);
flow_file->setAttribute(attribute.getName(), value);
logger_->log_info("Set attribute '%s' of flow file '%s' with value '%s'", attribute.getName(), flow_file->getUUIDStr(), value);
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */