blob: d4734454d1ec6c6e0ae91db3061e971451cb34ac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DefragmentText.h"
#include <vector>
#include <utility>
#include "core/Resource.h"
#include "serialization/PayloadSerializer.h"
#include "TextFragmentUtils.h"
#include "utils/gsl.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::processors {
const core::Relationship DefragmentText::Success("success", "Flowfiles that have been successfully defragmented");
const core::Relationship DefragmentText::Failure("failure", "Flowfiles that failed the defragmentation process");
const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor");
const core::Property DefragmentText::Pattern(
core::PropertyBuilder::createProperty("Pattern")
->withDescription("A regular expression to match at the start or end of messages.")
->isRequired(true)->build());
const core::Property DefragmentText::PatternLoc(
core::PropertyBuilder::createProperty("Pattern Location")->withDescription("Whether the pattern is located at the start or at the end of the messages.")
->withAllowableValues(PatternLocation::values())
->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build());
const core::Property DefragmentText::MaxBufferSize(
core::PropertyBuilder::createProperty("Max Buffer Size")
->withDescription("The maximum buffer size, if the buffer exceeds this, it will be transferred to failure. Expected format is <size> <data unit>")
->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build());
const core::Property DefragmentText::MaxBufferAge(
core::PropertyBuilder::createProperty("Max Buffer Age")->
withDescription("The maximum age of the buffer after which it will be transferred to success when matching Start of Message patterns or to failure when matching End of Message patterns. "
"Expected format is <duration> <time unit>")
->withDefaultValue("10 min")
->build());
void DefragmentText::initialize() {
setSupportedRelationships({Success, Failure});
setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize});
}
void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) {
gsl_Expects(context);
std::string max_buffer_age_str;
if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) {
core::TimeUnit unit;
uint64_t max_buffer_age;
if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) && core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) {
buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age));
setTriggerWhenEmpty(true);
logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 " ms", max_buffer_age);
}
}
std::string max_buffer_size_str;
if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) {
uint64_t max_buffer_size = core::DataSizeValue(max_buffer_size_str).getValue();
if (max_buffer_size > 0) {
buffer_.setMaxSize(max_buffer_size);
logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size);
}
}
context->getProperty(PatternLoc.getName(), pattern_location_);
std::string pattern_str;
if (context->getProperty(Pattern.getName(), pattern_str) && !pattern_str.empty()) {
pattern_ = std::regex(pattern_str);
logger_->log_trace("The Pattern is configured to be %s", pattern_str);
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or invalid");
}
}
void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* session) {
gsl_Expects(session);
auto flowFiles = flow_file_store_.getNewFlowFiles();
for (auto& file : flowFiles) {
if (file)
processNextFragment(session, gsl::not_null(std::move(file)));
}
{
std::shared_ptr<core::FlowFile> original_flow_file = session->get();
if (original_flow_file)
processNextFragment(session, gsl::not_null(std::move(original_flow_file)));
}
if (buffer_.maxSizeReached()) {
buffer_.flushAndReplace(session, Failure, nullptr);
return;
}
if (buffer_.maxAgeReached()) {
if (pattern_location_ == PatternLocation::START_OF_MESSAGE)
buffer_.flushAndReplace(session, Success, nullptr);
else
buffer_.flushAndReplace(session, Failure, nullptr);
}
}
void DefragmentText::processNextFragment(core::ProcessSession *session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment) {
if (!buffer_.isCompatible(*next_fragment)) {
buffer_.flushAndReplace(session, Failure, nullptr);
session->transfer(next_fragment, Failure);
return;
}
std::shared_ptr<core::FlowFile> split_before_last_pattern;
std::shared_ptr<core::FlowFile> split_after_last_pattern;
bool found_pattern = splitFlowFileAtLastPattern(session, next_fragment, split_before_last_pattern,
split_after_last_pattern);
if (split_before_last_pattern)
buffer_.append(session, gsl::not_null(std::move(split_before_last_pattern)));
if (found_pattern) {
buffer_.flushAndReplace(session, Success, split_after_last_pattern);
}
session->remove(next_fragment);
}
void DefragmentText::updateAttributesForSplitFiles(const core::FlowFile& original_flow_file,
const std::shared_ptr<core::FlowFile>& split_before_last_pattern,
const std::shared_ptr<core::FlowFile>& split_after_last_pattern,
const size_t split_position) const {
std::string base_name, post_name, offset_str;
if (!original_flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
return;
if (!original_flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
return;
if (!original_flow_file.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
return;
size_t fragment_offset = std::stoi(offset_str);
if (split_before_last_pattern) {
std::string first_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, split_before_last_pattern->getSize());
split_before_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, first_part_name);
}
if (split_after_last_pattern) {
std::string second_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset + split_position, split_after_last_pattern->getSize());
split_after_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, second_part_name);
split_after_last_pattern->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(fragment_offset + split_position));
}
}
namespace {
class AppendFlowFileToFlowFile : public OutputStreamCallback {
public:
explicit AppendFlowFileToFlowFile(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append, PayloadSerializer& serializer)
: flow_file_to_append_(flow_file_to_append), serializer_(serializer) {}
int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
return serializer_.serialize(flow_file_to_append_, stream);
}
private:
const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append_;
PayloadSerializer& serializer_;
};
void updateAppendedAttributes(core::FlowFile& buffered_ff) {
std::string base_name, post_name, offset_str;
if (!buffered_ff.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
return;
if (!buffered_ff.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
return;
if (!buffered_ff.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
return;
size_t fragment_offset = std::stoi(offset_str);
std::string buffer_new_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, buffered_ff.getSize());
buffered_ff.setAttribute(core::SpecialFlowAttribute::FILENAME, buffer_new_name);
}
struct ReadFlowFileContent : public InputStreamCallback {
std::string content;
int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
content.resize(stream->size());
const auto ret = stream->read(reinterpret_cast<uint8_t *>(content.data()), stream->size());
if (io::isError(ret))
return -1;
return gsl::narrow<int64_t>(ret);
}
};
size_t getSplitPosition(const std::smatch& last_match, DefragmentText::PatternLocation pattern_location) {
size_t split_position = last_match.position(0);
if (pattern_location == DefragmentText::PatternLocation::END_OF_MESSAGE) {
split_position += last_match.length(0);
}
return split_position;
}
} // namespace
bool DefragmentText::splitFlowFileAtLastPattern(core::ProcessSession *session,
const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
std::shared_ptr<core::FlowFile> &split_before_last_pattern,
std::shared_ptr<core::FlowFile> &split_after_last_pattern) const {
ReadFlowFileContent read_flow_file_content;
session->read(original_flow_file, &read_flow_file_content);
auto last_regex_match = utils::StringUtils::getLastRegexMatch(read_flow_file_content.content, pattern_);
if (!last_regex_match.ready()) {
split_before_last_pattern = session->clone(original_flow_file);
split_after_last_pattern = nullptr;
return false;
}
auto split_position = getSplitPosition(last_regex_match, pattern_location_);
if (split_position != 0) {
split_before_last_pattern = session->clone(original_flow_file, 0, split_position);
}
if (split_position != original_flow_file->getSize()) {
split_after_last_pattern = session->clone(original_flow_file, split_position, original_flow_file->getSize() - split_position);
}
updateAttributesForSplitFiles(*original_flow_file, split_before_last_pattern, split_after_last_pattern, split_position);
return true;
}
void DefragmentText::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
if (!flowFile)
return;
flow_file_store_.put(flowFile);
}
std::set<std::shared_ptr<core::Connectable>> DefragmentText::getOutGoingConnections(const std::string &relationship) const {
auto result = core::Connectable::getOutGoingConnections(relationship);
if (relationship == Self.getName()) {
result.insert(std::static_pointer_cast<core::Connectable>(std::const_pointer_cast<core::Processor>(shared_from_this())));
}
return result;
}
void DefragmentText::Buffer::append(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append) {
if (empty()) {
store(session, flow_file_to_append);
return;
}
auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
return session->read(ff, cb);
};
PayloadSerializer serializer(flowFileReader);
AppendFlowFileToFlowFile append_flow_file_to_flow_file(flow_file_to_append, serializer);
session->add(buffered_flow_file_);
session->append(buffered_flow_file_, &append_flow_file_to_flow_file);
updateAppendedAttributes(*buffered_flow_file_);
session->transfer(buffered_flow_file_, Self);
session->remove(flow_file_to_append);
}
bool DefragmentText::Buffer::maxSizeReached() const {
return !empty()
&& max_size_.has_value()
&& (max_size_.value() < buffered_flow_file_->getSize());
}
bool DefragmentText::Buffer::maxAgeReached() const {
return !empty()
&& max_age_.has_value()
&& (creation_time_ + max_age_.value() < std::chrono::steady_clock::now());
}
void DefragmentText::Buffer::setMaxAge(std::chrono::milliseconds max_age) {
max_age_ = max_age;
}
void DefragmentText::Buffer::setMaxSize(size_t max_size) {
max_size_ = max_size;
}
void DefragmentText::Buffer::flushAndReplace(core::ProcessSession* session, const core::Relationship& relationship,
const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
if (!empty()) {
session->add(buffered_flow_file_);
session->transfer(buffered_flow_file_, relationship);
}
store(session, new_buffered_flow_file);
}
void DefragmentText::Buffer::store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
buffered_flow_file_ = new_buffered_flow_file;
creation_time_ = std::chrono::steady_clock::now();
if (!empty()) {
session->add(buffered_flow_file_);
session->transfer(buffered_flow_file_, Self);
}
}
bool DefragmentText::Buffer::isCompatible(const core::FlowFile& fragment) const {
if (empty())
return true;
if (buffered_flow_file_->getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)
!= fragment.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE)) {
return false;
}
if (buffered_flow_file_->getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)
!= fragment.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE)) {
return false;
}
std::string current_offset_str, append_offset_str;
if (buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, current_offset_str)
!= fragment.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, append_offset_str)) {
return false;
}
if (!current_offset_str.empty() && !append_offset_str.empty()) {
size_t current_offset = std::stoi(current_offset_str);
size_t append_offset = std::stoi(append_offset_str);
if (current_offset + buffered_flow_file_->getSize() != append_offset)
return false;
}
return true;
}
REGISTER_RESOURCE(DefragmentText, "DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them");
} // namespace org::apache::nifi::minifi::processors