blob: 57287e50576bfe77c2671a42aecd1b462ee17241 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <optional>
#include <string_view>
#include <map>
#include <string>
#include <memory>
#include "core/OutputAttributeDefinition.h"
#include "core/Processor.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/PropertyType.h"
#include "core/RelationshipDefinition.h"
#include "utils/Enum.h"
#include "utils/Export.h"
#include "utils/RegexUtils.h"
namespace org::apache::nifi::minifi::processors::route_text {
enum class Routing {
DYNAMIC,
ALL,
ANY
};
enum class Matching {
STARTS_WITH,
ENDS_WITH,
CONTAINS,
EQUALS,
MATCHES_REGEX,
CONTAINS_REGEX,
EXPRESSION
};
enum class Segmentation {
FULL_TEXT,
PER_LINE
};
enum class CasePolicy {
CASE_SENSITIVE,
IGNORE_CASE
};
} // namespace org::apache::nifi::minifi::processors::route_text
namespace magic_enum::customize {
using Routing = org::apache::nifi::minifi::processors::route_text::Routing;
using Matching = org::apache::nifi::minifi::processors::route_text::Matching;
using Segmentation = org::apache::nifi::minifi::processors::route_text::Segmentation;
template <>
constexpr customize_t enum_name<Routing>(Routing value) noexcept {
switch (value) {
case Routing::DYNAMIC:
return "Dynamic Routing";
case Routing::ALL:
return "Route On All";
case Routing::ANY:
return "Route On Any";
}
return invalid_tag;
}
template <>
constexpr customize_t enum_name<Matching>(Matching value) noexcept {
switch (value) {
case Matching::STARTS_WITH:
return "Starts With";
case Matching::ENDS_WITH:
return "Ends With";
case Matching::CONTAINS:
return "Contains";
case Matching::EQUALS:
return "Equals";
case Matching::MATCHES_REGEX:
return "Matches Regex";
case Matching::CONTAINS_REGEX:
return "Contains Regex";
case Matching::EXPRESSION:
return "Satisfies Expression";
}
return invalid_tag;
}
template <>
constexpr customize_t enum_name<Segmentation>(Segmentation value) noexcept {
switch (value) {
case Segmentation::FULL_TEXT:
return "Full Text";
case Segmentation::PER_LINE:
return "Per Line";
}
return invalid_tag;
}
} // namespace magic_enum::customize
namespace org::apache::nifi::minifi::processors {
class RouteText : public core::Processor {
public:
EXTENSIONAPI static constexpr const char* Description = "Routes textual data based on a set of user-defined rules. Each segment in an incoming FlowFile is "
"compared against the values specified by user-defined Properties. The mechanism by which the text is compared "
"to these user-defined properties is defined by the 'Matching Strategy'. The data is then routed according to "
"these rules, routing each segment of the text individually.";
EXTENSIONAPI static constexpr auto RoutingStrategy = core::PropertyDefinitionBuilder<magic_enum::enum_count<route_text::Routing>()>::createProperty("Routing Strategy")
.withDescription("Specifies how to determine which Relationship(s) to use when evaluating the segments "
"of incoming text against the 'Matching Strategy' and user-defined properties. "
"'Dynamic Routing' routes to all the matching dynamic relationships (or 'unmatched' if none matches). "
"'Route On All' routes to 'matched' iff all dynamic relationships match. "
"'Route On Any' routes to 'matched' iff any of the dynamic relationships match. ")
.isRequired(true)
.withDefaultValue(magic_enum::enum_name(route_text::Routing::DYNAMIC))
.withAllowedValues(magic_enum::enum_names<route_text::Routing>())
.build();
EXTENSIONAPI static constexpr auto MatchingStrategy = core::PropertyDefinitionBuilder<magic_enum::enum_count<route_text::Matching>()>::createProperty("Matching Strategy")
.withDescription("Specifies how to evaluate each segment of incoming text against the user-defined properties. "
"Possible values are: 'Starts With', 'Ends With', 'Contains', 'Equals', 'Matches Regex', 'Contains Regex', 'Satisfies Expression'.")
.isRequired(true)
.withAllowedValues(magic_enum::enum_names<route_text::Matching>())
.build();
EXTENSIONAPI static constexpr auto TrimWhitespace = core::PropertyDefinitionBuilder<>::createProperty("Ignore Leading/Trailing Whitespace")
.withDescription("Indicates whether or not the whitespace at the beginning and end should be ignored when evaluating a segment.")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("true")
.build();
EXTENSIONAPI static constexpr auto IgnoreCase = core::PropertyDefinitionBuilder<>::createProperty("Ignore Case")
.withDescription("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
"This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("false")
.build();
EXTENSIONAPI static constexpr auto GroupingRegex = core::PropertyDefinitionBuilder<>::createProperty("Grouping Regular Expression")
.withDescription("Specifies a Regular Expression to evaluate against each segment to determine which Group it should be placed in. "
"The Regular Expression must have at least one Capturing Group that defines the segment's Group. If multiple Capturing Groups "
"exist in the Regular Expression, the values from all Capturing Groups will be joined together with \", \". Two segments will not be "
"placed into the same FlowFile unless they both have the same value for the Group (or neither matches the Regular Expression). "
"For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\" (and use \"Per Line\" segmentation). "
"Two segments that have the same Group but different Relationships will never be placed into the same FlowFile.")
.build();
EXTENSIONAPI static constexpr auto GroupingFallbackValue = core::PropertyDefinitionBuilder<>::createProperty("Grouping Fallback Value")
.withDescription("If the 'Grouping Regular Expression' is specified and the matching fails, this value will be considered the group of the segment.")
.build();
EXTENSIONAPI static constexpr auto SegmentationStrategy = core::PropertyDefinitionBuilder<magic_enum::enum_count<route_text::Segmentation>()>::createProperty("Segmentation Strategy")
.withDescription("Specifies what portions of the FlowFile content constitutes a single segment to be processed. "
"'Full Text' considers the whole content as a single segment, 'Per Line' considers each line of the content as a separate segment")
.isRequired(true)
.withDefaultValue(magic_enum::enum_name(route_text::Segmentation::PER_LINE))
.withAllowedValues(magic_enum::enum_names<route_text::Segmentation>())
.build();
EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 7>{
RoutingStrategy,
MatchingStrategy,
TrimWhitespace,
IgnoreCase,
GroupingRegex,
GroupingFallbackValue,
SegmentationStrategy
};
EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original input file will be routed to this destination"};
EXTENSIONAPI static constexpr auto Unmatched = core::RelationshipDefinition{"unmatched", "Segments that do not satisfy the required user-defined rules will be routed to this Relationship"};
EXTENSIONAPI static constexpr auto Matched = core::RelationshipDefinition{"matched", "Segments that satisfy the required user-defined rules will be routed to this Relationship"};
EXTENSIONAPI static constexpr auto Relationships = std::array{
Original,
Unmatched,
Matched
};
EXTENSIONAPI static constexpr auto Group = core::OutputAttributeDefinition<0>{"RouteText.Group", {},
"The value captured by all capturing groups in the 'Grouping Regular Expression' property. If this property is not set, this attribute will not be added."};
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 1>{Group};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
EXTENSIONAPI static constexpr auto RelationshipToRouteTo = core::DynamicProperty{"Relationship Name",
"value to match against",
"Routes data that matches the value specified in the Dynamic Property Value to the Relationship specified in the Dynamic Property Key.",
true};
EXTENSIONAPI static constexpr auto DynamicProperties = std::array{RelationshipToRouteTo};
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = true;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
explicit RouteText(std::string name, const utils::Identifier& uuid = {});
void initialize() override;
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* sessionFactory) override;
void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
void onDynamicPropertyModified(const core::Property& orig_property, const core::Property& new_property) override;
private:
static constexpr const char* GROUP_ATTRIBUTE_NAME = "RouteText.Group";
class ReadCallback;
class MatchingContext;
struct Segment {
std::string_view value_;
size_t idx_; // 1-based index as in nifi
};
std::string_view preprocess(std::string_view str) const;
bool matchSegment(MatchingContext& context, const Segment& segment, const core::Property& prop) const;
std::optional<std::string> getGroup(const std::string_view& segment) const;
route_text::Routing routing_;
route_text::Matching matching_;
route_text::Segmentation segmentation_;
bool trim_{true};
route_text::CasePolicy case_policy_{route_text::CasePolicy::CASE_SENSITIVE};
std::optional<utils::Regex> group_regex_;
std::string group_fallback_;
std::map<std::string, core::Property> dynamic_properties_;
std::map<std::string, core::Relationship> dynamic_relationships_;
std::shared_ptr<core::logging::Logger> logger_;
};
} // namespace org::apache::nifi::minifi::processors