| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "RouteText.h" |
| |
| #include <algorithm> |
| #include <map> |
| #include <utility> |
| #include <vector> |
| |
| #include "core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| #include "core/Resource.h" |
| #include "core/logging/LoggerFactory.h" |
| #include "io/StreamPipe.h" |
| #include "range/v3/range/conversion.hpp" |
| #include "range/v3/view/join.hpp" |
| #include "range/v3/view/tail.hpp" |
| #include "range/v3/view/transform.hpp" |
| #include "range/v3/algorithm/all_of.hpp" |
| #include "range/v3/algorithm/any_of.hpp" |
| #include "utils/OptionalUtils.h" |
| #include "utils/ProcessorConfigUtils.h" |
| #include "utils/Searcher.h" |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| void RouteText::initialize() { |
| setSupportedProperties(Properties); |
| setSupportedRelationships(Relationships); |
| } |
| |
| void RouteText::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { |
| routing_ = utils::parseEnumProperty<route_text::Routing>(context, RoutingStrategy); |
| matching_ = utils::parseEnumProperty<route_text::Matching>(context, MatchingStrategy); |
| trim_ = utils::parseBoolProperty(context, TrimWhitespace); |
| case_policy_ = utils::parseBoolProperty(context, IgnoreCase) ? route_text::CasePolicy::IGNORE_CASE : route_text::CasePolicy::CASE_SENSITIVE; |
| group_regex_ = context.getProperty(GroupingRegex) | utils::toOptional() | utils::transform([] (const auto& str) {return utils::Regex(str);}); |
| segmentation_ = utils::parseEnumProperty<route_text::Segmentation>(context, SegmentationStrategy); |
| group_fallback_ = context.getProperty(GroupingFallbackValue).value_or(""); |
| |
| |
| for (const auto& property_name : context.getDynamicPropertyKeys()) { |
| core::RelationshipDefinition rel{property_name, "Dynamic Route"}; |
| dynamic_relationships_[property_name] = rel; |
| logger_->log_info("RouteText registered dynamic route '{}'", property_name); |
| } |
| } |
| |
| class RouteText::ReadCallback { |
| using Fn = std::function<void(Segment)>; |
| |
| public: |
| ReadCallback(route_text::Segmentation segmentation, size_t file_size, Fn&& fn) |
| : segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {} |
| |
| int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const { |
| std::vector<std::byte> buffer; |
| buffer.resize(file_size_); |
| size_t ret = stream->read(buffer); |
| if (io::isError(ret)) { |
| return -1; |
| } |
| if (ret != file_size_) { |
| throw Exception(PROCESS_SESSION_EXCEPTION, "Couldn't read whole flowfile content"); |
| } |
| std::string_view content{reinterpret_cast<const char*>(buffer.data()), buffer.size()}; |
| switch (segmentation_) { |
| case route_text::Segmentation::FULL_TEXT: { |
| fn_({content, 0}); |
| return gsl::narrow<int64_t>(content.length()); |
| } |
| case route_text::Segmentation::PER_LINE: { |
| // 1-based index as in nifi |
| size_t segment_idx = 1; |
| std::string_view::size_type curr = 0; |
| while (curr < content.length()) { |
| // find beginning of next line |
| std::string_view::size_type next_line = content.find('\n', curr); |
| |
| if (next_line == std::string_view::npos) { |
| fn_({content.substr(curr), segment_idx}); |
| } else { |
| // include newline character to be in-line with nifi semantics |
| ++next_line; |
| fn_({content.substr(curr, next_line - curr), segment_idx}); |
| } |
| curr = next_line; |
| ++segment_idx; |
| } |
| return gsl::narrow<int64_t>(content.length()); |
| } |
| } |
| throw Exception(PROCESSOR_EXCEPTION, "Unknown segmentation strategy"); |
| } |
| |
| private: |
| route_text::Segmentation segmentation_; |
| size_t file_size_; |
| Fn fn_; |
| }; |
| |
| class RouteText::MatchingContext { |
| struct CaseAwareHash { |
| explicit CaseAwareHash(route_text::CasePolicy policy): policy_(policy) {} |
| size_t operator()(char ch) const { |
| if (policy_ == route_text::CasePolicy::CASE_SENSITIVE) { |
| return static_cast<size_t>(ch); |
| } |
| return std::hash<int>{}(std::tolower(static_cast<unsigned char>(ch))); |
| } |
| |
| private: |
| route_text::CasePolicy policy_; |
| }; |
| |
| struct CaseAwareEq { |
| explicit CaseAwareEq(route_text::CasePolicy policy): policy_(policy) {} |
| bool operator()(char a, char b) const { |
| if (policy_ == route_text::CasePolicy::CASE_SENSITIVE) { |
| return a == b; |
| } |
| return std::tolower(static_cast<unsigned char>(a)) == std::tolower(static_cast<unsigned char>(b)); |
| } |
| |
| private: |
| route_text::CasePolicy policy_; |
| }; |
| |
| public: |
| MatchingContext(core::ProcessContext& process_context, std::shared_ptr<core::FlowFile> flow_file, route_text::CasePolicy case_policy) |
| : process_context_(process_context), |
| flow_file_(std::move(flow_file)), |
| case_policy_(case_policy) {} |
| |
| const utils::Regex& getRegexProperty(const std::string& property_name) { |
| auto it = regex_values_.find(property_name); |
| if (it != regex_values_.end()) { |
| return it->second; |
| } |
| const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property"); |
| std::vector<utils::Regex::Mode> flags; |
| if (case_policy_ == route_text::CasePolicy::IGNORE_CASE) { |
| flags.push_back(utils::Regex::Mode::ICASE); |
| } |
| return (regex_values_[property_name] = utils::Regex(value, flags)); |
| } |
| |
| const std::string& getStringProperty(const std::string& property_name) { |
| auto it = string_values_.find(property_name); |
| if (it != string_values_.end()) { |
| return it->second; |
| } |
| const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property"); |
| return (string_values_[property_name] = value); |
| } |
| |
| const auto& getSearcher(const std::string& property_name) { |
| auto it = searcher_values_.find(property_name); |
| if (it != searcher_values_.end()) { |
| return it->second.searcher_; |
| } |
| const auto value = process_context_.getDynamicProperty(property_name, flow_file_.get()) | utils::orThrow("Missing dynamic property"); |
| |
| return searcher_values_.emplace( |
| std::piecewise_construct, std::forward_as_tuple(property_name), |
| std::forward_as_tuple(value, case_policy_)).first->second.searcher_; |
| } |
| |
| core::ProcessContext& process_context_; |
| std::shared_ptr<core::FlowFile> flow_file_; |
| route_text::CasePolicy case_policy_; |
| |
| std::map<std::string, std::string> string_values_; |
| std::map<std::string, utils::Regex> regex_values_; |
| |
| struct OwningSearcher { |
| OwningSearcher(std::string str, route_text::CasePolicy case_policy) |
| : str_(std::move(str)), searcher_(str_.cbegin(), str_.cend(), CaseAwareHash{case_policy}, CaseAwareEq{case_policy}) {} |
| OwningSearcher(const OwningSearcher&) = delete; |
| OwningSearcher(OwningSearcher&&) = delete; |
| OwningSearcher& operator=(const OwningSearcher&) = delete; |
| OwningSearcher& operator=(OwningSearcher&&) = delete; |
| ~OwningSearcher() = default; |
| |
| std::string str_; |
| utils::Searcher<std::string::const_iterator, CaseAwareHash, CaseAwareEq> searcher_; |
| }; |
| |
| std::map<std::string, OwningSearcher> searcher_values_; |
| }; |
| |
| namespace { |
| struct Route { |
| core::Relationship relationship_; |
| std::optional<std::string> group_name_; |
| |
| bool operator<(const Route& other) const { |
| return std::tie(relationship_, group_name_) < std::tie(other.relationship_, other.group_name_); |
| } |
| }; |
| } // namespace |
| |
| void RouteText::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { |
| auto flow_file = session.get(); |
| if (!flow_file) { |
| context.yield(); |
| return; |
| } |
| |
| std::map<Route, std::string> flow_file_contents; |
| |
| MatchingContext matching_context(context, flow_file, case_policy_); |
| |
| ReadCallback callback(segmentation_, flow_file->getSize(), [&] (Segment segment) { |
| std::string_view original_value = segment.value_; |
| std::string_view preprocessed_value = preprocess(segment.value_); |
| |
| if (matching_ != route_text::Matching::EXPRESSION) { |
| // an Expression has access to the raw segment like in nifi |
| // all others use the preprocessed_value |
| segment.value_ = preprocessed_value; |
| } |
| |
| // group extraction always uses the preprocessed |
| auto group = getGroup(preprocessed_value); |
| auto dynamic_property_keys = context.getDynamicPropertyKeys(); |
| switch (routing_) { |
| case route_text::Routing::ALL: { |
| if (ranges::all_of(dynamic_property_keys, [&] (const auto& property_name) { |
| return matchSegment(matching_context, segment, property_name); |
| })) { |
| flow_file_contents[{Matched, group}] += original_value; |
| } else { |
| flow_file_contents[{Unmatched, group}] += original_value; |
| } |
| return; |
| } |
| case route_text::Routing::ANY: { |
| if (ranges::any_of(dynamic_property_keys, [&] (const auto& property_name) { |
| return matchSegment(matching_context, segment, property_name); |
| })) { |
| flow_file_contents[{Matched, group}] += original_value; |
| } else { |
| flow_file_contents[{Unmatched, group}] += original_value; |
| } |
| return; |
| } |
| case route_text::Routing::DYNAMIC: { |
| bool routed = false; |
| for (const auto& property_name : dynamic_property_keys) { |
| if (matchSegment(matching_context, segment, property_name)) { |
| flow_file_contents[{dynamic_relationships_[property_name], group}] += original_value; |
| routed = true; |
| } |
| } |
| if (!routed) { |
| flow_file_contents[{Unmatched, group}] += original_value; |
| } |
| return; |
| } |
| } |
| throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy"); |
| }); |
| session.read(flow_file, std::move(callback)); |
| |
| for (const auto& [route, content] : flow_file_contents) { |
| auto new_flow_file = session.create(flow_file.get()); |
| if (route.group_name_) { |
| new_flow_file->setAttribute(GROUP_ATTRIBUTE_NAME, route.group_name_.value()); |
| } |
| session.writeBuffer(new_flow_file, content); |
| session.transfer(new_flow_file, route.relationship_); |
| } |
| |
| session.transfer(flow_file, Original); |
| } |
| |
| std::string_view RouteText::preprocess(std::string_view str) const { |
| if (segmentation_ == route_text::Segmentation::PER_LINE) { |
| // do not consider the trailing \r\n characters in order to conform to nifi |
| auto len = str.find_last_not_of("\r\n"); |
| if (len != std::string_view::npos) { |
| str = str.substr(0, len + 1); |
| } else { |
| str = ""; |
| } |
| } |
| if (trim_) { |
| str = utils::string::trim(str); |
| } |
| return str; |
| } |
| |
| namespace { |
| std::optional<std::string> getDynamicPropertyWithOverrides(core::ProcessContext& context, |
| const std::string& property_name, |
| core::FlowFile& flow_file, |
| const std::map<std::string, std::string>& overrides) { |
| std::map<std::string, std::optional<std::string>> original_attributes; |
| for (const auto& [override_key, override_value] : overrides) { |
| original_attributes[override_key] = flow_file.getAttribute(override_key); |
| flow_file.setAttribute(override_key, override_value); |
| } |
| auto onExit = gsl::finally([&]{ |
| for (const auto& attr : original_attributes) { |
| if (attr.second) { |
| flow_file.setAttribute(attr.first, attr.second.value()); |
| } else { |
| flow_file.removeAttribute(attr.first); |
| } |
| } |
| }); |
| return context.getDynamicProperty(property_name, &flow_file) | utils::toOptional(); |
| } |
| } // namespace |
| |
| bool RouteText::matchSegment(MatchingContext& context, const Segment& segment, const std::string& property_name) const { |
| switch (matching_) { |
| case route_text::Matching::EXPRESSION: { |
| std::map<std::string, std::string> variables; |
| variables["segment"] = segment.value_; |
| variables["segmentNo"] = std::to_string(segment.idx_); |
| if (segmentation_ == route_text::Segmentation::PER_LINE) { |
| // for nifi compatibility |
| variables["line"] = segment.value_; |
| variables["lineNo"] = std::to_string(segment.idx_); |
| } |
| if (const auto result = getDynamicPropertyWithOverrides(context.process_context_, property_name, *context.flow_file_, variables)) { |
| return utils::string::toBool(*result).value_or(false); |
| } else { |
| throw Exception(PROCESSOR_EXCEPTION, "Missing dynamic property: '" + property_name + "'"); |
| } |
| } |
| case route_text::Matching::STARTS_WITH: { |
| return utils::string::startsWith(segment.value_, context.getStringProperty(property_name), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE); |
| } |
| case route_text::Matching::ENDS_WITH: { |
| return utils::string::endsWith(segment.value_, context.getStringProperty(property_name), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE); |
| } |
| case route_text::Matching::CONTAINS: { |
| return std::search(segment.value_.begin(), segment.value_.end(), context.getSearcher(property_name)) != segment.value_.end(); |
| } |
| case route_text::Matching::EQUALS: { |
| return utils::string::equals(segment.value_, context.getStringProperty(property_name), case_policy_ == route_text::CasePolicy::CASE_SENSITIVE); |
| } |
| case route_text::Matching::CONTAINS_REGEX: { |
| std::string segment_str = std::string(segment.value_); |
| return utils::regexSearch(segment_str, context.getRegexProperty(property_name)); |
| } |
| case route_text::Matching::MATCHES_REGEX: { |
| std::string segment_str = std::string(segment.value_); |
| return utils::regexMatch(segment_str, context.getRegexProperty(property_name)); |
| } |
| } |
| throw Exception(PROCESSOR_EXCEPTION, "Unknown matching strategy"); |
| } |
| |
| std::optional<std::string> RouteText::getGroup(const std::string_view& segment) const { |
| if (!group_regex_) { |
| return std::nullopt; |
| } |
| utils::SMatch match_result; |
| std::string segment_str = std::string(segment); |
| if (!utils::regexMatch(segment_str, match_result, group_regex_.value())) { |
| return group_fallback_; |
| } |
| // unused capturing groups default to empty string |
| auto to_string = [] (const auto& submatch) -> std::string {return submatch;}; |
| return ranges::views::tail(match_result) // only join the capture groups |
| | ranges::views::transform(to_string) |
| | ranges::views::join(std::string_view(", ")) |
| | ranges::to<std::string>(); |
| } |
| |
| |
| REGISTER_RESOURCE(RouteText, Processor); |
| |
| } // namespace org::apache::nifi::minifi::processors |