blob: 9c6c3c6761ae9d8dfec67deba07caccc309e12c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DefragmentText.h"
#include <vector>
#include <utility>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "serialization/PayloadSerializer.h"
#include "TextFragmentUtils.h"
#include "utils/gsl.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor");
void DefragmentText::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void DefragmentText::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
if (auto max_buffer_age = context.getProperty(MaxBufferAge) | utils::andThen(&core::TimePeriodValue::fromString)) {
max_age_ = max_buffer_age->getMilliseconds();
setTriggerWhenEmpty(true);
logger_->log_trace("The Buffer maximum age is configured to be {}", max_buffer_age->getMilliseconds());
}
auto max_buffer_size = context.getProperty<core::DataSizeValue>(MaxBufferSize);
if (max_buffer_size.has_value() && max_buffer_size->getValue() > 0) {
max_size_ = max_buffer_size->getValue();
logger_->log_trace("The Buffer maximum size is configured to be {} B", max_buffer_size->getValue());
}
pattern_location_ = utils::parseEnumProperty<defragment_text::PatternLocation>(context, PatternLoc);
std::string pattern_str;
if (context.getProperty(Pattern, pattern_str) && !pattern_str.empty()) {
pattern_ = utils::Regex(pattern_str);
logger_->log_trace("The Pattern is configured to be {}", pattern_str);
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or invalid");
}
}
void DefragmentText::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
auto flowFiles = flow_file_store_.getNewFlowFiles();
for (auto& file : flowFiles) {
if (file)
processNextFragment(session, gsl::not_null(file));
}
{
std::shared_ptr<core::FlowFile> original_flow_file = session.get();
if (original_flow_file)
processNextFragment(session, gsl::not_null(std::move(original_flow_file)));
}
for (auto& [fragment_source_id, fragment_source] : fragment_sources_) {
if (fragment_source.buffer.maxSizeReached(max_size_)) {
fragment_source.buffer.flushAndReplace(session, Failure, nullptr);
} else if (fragment_source.buffer.maxAgeReached(max_age_)) {
fragment_source.buffer.flushAndReplace(session, pattern_location_ == defragment_text::PatternLocation::START_OF_MESSAGE ? Success : Failure, nullptr);
}
}
}
namespace {
std::optional<size_t> getFragmentOffset(const core::FlowFile& flow_file) {
if (auto offset_attribute = flow_file.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE)) {
return std::stoi(*offset_attribute);
}
return std::nullopt;
}
} // namespace
void DefragmentText::processNextFragment(core::ProcessSession& session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& next_fragment) {
auto fragment_source_id = FragmentSource::Id(*next_fragment);
auto& fragment_source = fragment_sources_[fragment_source_id];
auto& buffer = fragment_source.buffer;
if (!buffer.empty() && buffer.getNextFragmentOffset() != getFragmentOffset(*next_fragment)) {
buffer.flushAndReplace(session, Failure, nullptr);
session.transfer(next_fragment, Failure);
return;
}
std::shared_ptr<core::FlowFile> split_before_last_pattern;
std::shared_ptr<core::FlowFile> split_after_last_pattern;
bool found_pattern = splitFlowFileAtLastPattern(session, next_fragment, split_before_last_pattern, split_after_last_pattern);
if (split_before_last_pattern)
buffer.append(session, gsl::not_null(std::move(split_before_last_pattern)));
if (found_pattern) {
buffer.flushAndReplace(session, Success, split_after_last_pattern);
}
session.remove(next_fragment);
}
void DefragmentText::updateAttributesForSplitFiles(const core::FlowFile& original_flow_file,
const std::shared_ptr<core::FlowFile>& split_before_last_pattern,
const std::shared_ptr<core::FlowFile>& split_after_last_pattern,
const size_t split_position) {
std::string base_name;
std::string post_name;
std::string offset_str;
if (!original_flow_file.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
return;
if (!original_flow_file.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
return;
if (!original_flow_file.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
return;
size_t fragment_offset = std::stoi(offset_str);
if (split_before_last_pattern) {
std::string first_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, split_before_last_pattern->getSize());
split_before_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, first_part_name);
}
if (split_after_last_pattern) {
std::string second_part_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset + split_position, split_after_last_pattern->getSize());
split_after_last_pattern->setAttribute(core::SpecialFlowAttribute::FILENAME, second_part_name);
split_after_last_pattern->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(fragment_offset + split_position));
}
}
namespace {
void updateAppendedAttributes(core::FlowFile& buffered_ff) {
std::string base_name;
std::string post_name;
std::string offset_str;
if (!buffered_ff.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
return;
if (!buffered_ff.getAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, post_name))
return;
if (!buffered_ff.getAttribute(textfragmentutils::OFFSET_ATTRIBUTE, offset_str))
return;
size_t fragment_offset = std::stoi(offset_str);
std::string buffer_new_name = textfragmentutils::createFileName(base_name, post_name, fragment_offset, buffered_ff.getSize());
buffered_ff.setAttribute(core::SpecialFlowAttribute::FILENAME, buffer_new_name);
}
size_t getSplitPosition(const utils::SMatch& last_match, defragment_text::PatternLocation pattern_location) {
size_t split_position = last_match.position(0);
if (pattern_location == defragment_text::PatternLocation::END_OF_MESSAGE) {
split_position += last_match.length(0);
}
return split_position;
}
} // namespace
bool DefragmentText::splitFlowFileAtLastPattern(core::ProcessSession& session,
const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
std::shared_ptr<core::FlowFile> &split_before_last_pattern,
std::shared_ptr<core::FlowFile> &split_after_last_pattern) const {
const auto read_result = session.readBuffer(original_flow_file);
auto last_regex_match = utils::getLastRegexMatch(to_string(read_result), pattern_);
if (!last_regex_match.ready()) {
split_before_last_pattern = session.clone(original_flow_file);
split_after_last_pattern = nullptr;
return false;
}
auto split_position = getSplitPosition(last_regex_match, pattern_location_);
if (split_position != 0) {
split_before_last_pattern = session.clone(original_flow_file, 0, split_position);
}
if (split_position != original_flow_file->getSize()) {
split_after_last_pattern = session.clone(original_flow_file, split_position, original_flow_file->getSize() - split_position);
}
updateAttributesForSplitFiles(*original_flow_file, split_before_last_pattern, split_after_last_pattern, split_position);
return true;
}
void DefragmentText::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
if (!flowFile)
return;
flow_file_store_.put(flowFile);
}
std::set<core::Connectable*> DefragmentText::getOutGoingConnections(const std::string &relationship) {
auto result = core::Connectable::getOutGoingConnections(relationship);
if (relationship == Self.getName()) {
result.insert(this);
}
return result;
}
void DefragmentText::Buffer::append(core::ProcessSession& session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append) {
if (empty()) {
store(session, flow_file_to_append);
return;
}
auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
return session.read(ff, cb);
};
PayloadSerializer serializer(flowFileReader);
session.add(buffered_flow_file_);
session.append(buffered_flow_file_, [&serializer, &flow_file_to_append](const auto& output_stream) -> int64_t {
return serializer.serialize(flow_file_to_append, output_stream);
});
updateAppendedAttributes(*buffered_flow_file_);
session.transfer(buffered_flow_file_, Self);
session.remove(flow_file_to_append);
}
bool DefragmentText::Buffer::maxSizeReached(const std::optional<size_t> max_size) const {
return !empty()
&& max_size.has_value()
&& (max_size.value() < buffered_flow_file_->getSize());
}
bool DefragmentText::Buffer::maxAgeReached(const std::optional<std::chrono::milliseconds> max_age) const {
return !empty()
&& max_age.has_value()
&& (creation_time_ + max_age.value() < std::chrono::steady_clock::now());
}
void DefragmentText::Buffer::flushAndReplace(core::ProcessSession& session, const core::Relationship& relationship,
const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
if (!empty()) {
session.add(buffered_flow_file_);
session.transfer(buffered_flow_file_, relationship);
}
store(session, new_buffered_flow_file);
}
void DefragmentText::Buffer::store(core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file) {
buffered_flow_file_ = new_buffered_flow_file;
creation_time_ = std::chrono::steady_clock::now();
if (!empty()) {
session.add(buffered_flow_file_);
session.transfer(buffered_flow_file_, Self);
}
}
std::optional<size_t> DefragmentText::Buffer::getNextFragmentOffset() const {
if (empty())
return std::nullopt;
if (auto offset_attribute = buffered_flow_file_->getAttribute(textfragmentutils::OFFSET_ATTRIBUTE))
return std::stoi(*offset_attribute) + buffered_flow_file_->getSize();
return std::nullopt;
}
DefragmentText::FragmentSource::Id::Id(const core::FlowFile& flow_file) {
if (auto absolute_path = flow_file.getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH))
absolute_path_ = *absolute_path;
}
size_t DefragmentText::FragmentSource::Id::hash::operator() (const Id& fragment_id) const {
return std::hash<std::optional<std::string>>{}(fragment_id.absolute_path_);
}
REGISTER_RESOURCE(DefragmentText, Processor);
} // namespace org::apache::nifi::minifi::processors