blob: 4fbbce6a6a3fd3e9205d4c66f7e93e9a94e2f73c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.util.LineDemarcator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"attributes", "routing", "text", "regexp", "regex", "Regular Expression", "Expression Language", "csv", "filter", "logs", "delimited", "find", "string", "search", "filter", "detect"})
@CapabilityDescription("Routes textual data based on a set of user-defined rules. Each line in an incoming FlowFile is compared against the values specified by user-defined Properties. "
+ "The mechanism by which the text is compared to these user-defined properties is defined by the 'Matching Strategy'. The data is then routed according to these rules, routing "
+ "each line of the text individually.")
@DynamicProperty(name = "Relationship Name", value = "value to match against", description = "Routes data that matches the value specified in the Dynamic Property Value to the "
+ "Relationship specified in the Dynamic Property Key.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's value")
@WritesAttributes({
@WritesAttribute(attribute = "RouteText.Route", description = "The name of the relationship to which the FlowFile was routed."),
@WritesAttribute(attribute = "RouteText.Group", description = "The value captured by all capturing groups in the 'Grouping Regular Expression' property. "
+ "If this property is not set or contains no capturing groups, this attribute will not be added.")
})
public class RouteText extends AbstractProcessor {
public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route";
public static final String GROUP_ATTRIBUTE_KEY = "RouteText.Group";
private static final String routeAllMatchValue = "Route to 'matched' if line matches all conditions";
private static final String routeAnyMatchValue = "Route to 'matched' if lines matches any condition";
private static final String routePropertyNameValue = "Route to each matching Property Name";
private static final String startsWithValue = "Starts With";
private static final String endsWithValue = "Ends With";
private static final String containsValue = "Contains";
private static final String equalsValue = "Equals";
private static final String matchesRegularExpressionValue = "Matches Regular Expression";
private static final String containsRegularExpressionValue = "Contains Regular Expression";
private static final String satisfiesExpression = "Satisfies Expression";
public static final AllowableValue ROUTE_TO_MATCHING_PROPERTY_NAME = new AllowableValue(routePropertyNameValue, routePropertyNameValue,
"Lines will be routed to each relationship whose corresponding expression evaluates to 'true'");
public static final AllowableValue ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH = new AllowableValue(routeAllMatchValue, routeAllMatchValue,
"Requires that all user-defined expressions evaluate to 'true' for the line to be considered a match");
public static final AllowableValue ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES = new AllowableValue(routeAnyMatchValue, routeAnyMatchValue,
"Requires that at least one user-defined expression evaluate to 'true' for the line to be considered a match");
public static final AllowableValue STARTS_WITH = new AllowableValue(startsWithValue, startsWithValue,
"Match lines based on whether the line starts with the property value");
public static final AllowableValue ENDS_WITH = new AllowableValue(endsWithValue, endsWithValue,
"Match lines based on whether the line ends with the property value");
public static final AllowableValue CONTAINS = new AllowableValue(containsValue, containsValue,
"Match lines based on whether the line contains the property value");
public static final AllowableValue EQUALS = new AllowableValue(equalsValue, equalsValue,
"Match lines based on whether the line equals the property value");
public static final AllowableValue MATCHES_REGULAR_EXPRESSION = new AllowableValue(matchesRegularExpressionValue, matchesRegularExpressionValue,
"Match lines based on whether the line exactly matches the Regular Expression that is provided as the Property value");
public static final AllowableValue CONTAINS_REGULAR_EXPRESSION = new AllowableValue(containsRegularExpressionValue, containsRegularExpressionValue,
"Match lines based on whether the line contains some text that matches the Regular Expression that is provided as the Property value");
public static final AllowableValue SATISFIES_EXPRESSION = new AllowableValue(satisfiesExpression, satisfiesExpression,
"Match lines based on whether or not the the text satisfies the given Expression Language expression. I.e., the line will match if the property value, evaluated as "
+ "an Expression, returns true. The expression is able to reference FlowFile Attributes, as well as the variables 'line' (which is the text of the line to evaluate) and "
+ "'lineNo' (which is the line number being evaluated. This will be 1 for the first line, 2 for the second and so on).");
public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder()
.name("Routing Strategy")
.description("Specifies how to determine which Relationship(s) to use when evaluating the lines of incoming text against the 'Matching Strategy' and user-defined properties.")
.required(true)
.allowableValues(ROUTE_TO_MATCHING_PROPERTY_NAME, ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH, ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES)
.defaultValue(ROUTE_TO_MATCHING_PROPERTY_NAME.getValue())
.dynamic(false)
.build();
public static final PropertyDescriptor MATCH_STRATEGY = new PropertyDescriptor.Builder()
.name("Matching Strategy")
.description("Specifies how to evaluate each line of incoming text against the user-defined properties.")
.required(true)
.allowableValues(SATISFIES_EXPRESSION, STARTS_WITH, ENDS_WITH, CONTAINS, EQUALS, MATCHES_REGULAR_EXPRESSION, CONTAINS_REGULAR_EXPRESSION)
.dynamic(false)
.build();
public static final PropertyDescriptor TRIM_WHITESPACE = new PropertyDescriptor.Builder()
.name("Ignore Leading/Trailing Whitespace")
.description("Indicates whether or not the whitespace at the beginning and end of the lines should be ignored when evaluating the line.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("true")
.dynamic(false)
.build();
static final PropertyDescriptor IGNORE_CASE = new PropertyDescriptor.Builder()
.name("Ignore Case")
.description("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result. "
+ "This property is ignored if the 'Matching Strategy' is set to 'Satisfies Expression'.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor GROUPING_REGEX = new PropertyDescriptor.Builder()
.name("Grouping Regular Expression")
.description("Specifies a Regular Expression to evaluate against each line to determine which Group the line should be placed in. "
+ "The Regular Expression must have at least one Capturing Group that defines the line's Group. If multiple Capturing Groups exist in the Regular Expression, the values from all "
+ "Capturing Groups will be concatenated together. Two lines will not be placed into the same FlowFile unless they both have the same value for the Group "
+ "(or neither line matches the Regular Expression). For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\". "
+ "Two lines that have the same Group but different Relationships will never be placed into the same FlowFile.")
.addValidator(StandardValidators.createRegexValidator(1, Integer.MAX_VALUE, false))
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the incoming text is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input file will be routed to this destination when the lines have been successfully routed to 1 or more relationships")
.build();
public static final Relationship REL_NO_MATCH = new Relationship.Builder()
.name("unmatched")
.description("Data that does not satisfy the required user-defined rules will be routed to this Relationship")
.build();
public static final Relationship REL_MATCH = new Relationship.Builder()
.name("matched")
.description("Data that satisfies the required user-defined rules will be routed to this Relationship")
.build();
private static Group EMPTY_GROUP = new Group(Collections.emptyList());
private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
private List<PropertyDescriptor> properties;
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
/**
* Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} for quick access in
* {@link #onTrigger(ProcessContext, ProcessSession)}
*/
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
private volatile Pattern groupingRegex = null;
@VisibleForTesting
final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 1024;
/**
* LRU cache for the compiled patterns. The size of the cache is determined by the value of
* {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
*/
@VisibleForTesting
final ConcurrentMap<String, Pattern> patternsCache = CacheBuilder.newBuilder()
.maximumSize(PATTERNS_CACHE_MAXIMUM_ENTRIES)
.<String, Pattern>build()
.asMap();
private Pattern cachedCompiledPattern(final String regex, final boolean ignoreCase) {
return patternsCache.computeIfAbsent(regex,
r -> ignoreCase ? Pattern.compile(r, Pattern.CASE_INSENSITIVE) : Pattern.compile(r));
}
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> set = new HashSet<>();
set.add(REL_ORIGINAL);
set.add(REL_NO_MATCH);
relationships = new AtomicReference<>(set);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ROUTE_STRATEGY);
properties.add(MATCH_STRATEGY);
properties.add(CHARACTER_SET);
properties.add(TRIM_WHITESPACE);
properties.add(IGNORE_CASE);
properties.add(GROUPING_REGEX);
this.properties = Collections.unmodifiableList(properties);
}
@Override
public Set<Relationship> getRelationships() {
return relationships.get();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.build();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.equals(IGNORE_CASE) && !newValue.equals(oldValue)) {
patternsCache.clear();
}
if (descriptor.equals(ROUTE_STRATEGY)) {
configuredRouteStrategy = newValue;
} else {
final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
if (newValue == null) {
newDynamicPropertyNames.remove(descriptor.getName());
} else if (oldValue == null && descriptor.isDynamic()) { // new property
newDynamicPropertyNames.add(descriptor.getName());
}
this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
}
// formulate the new set of Relationships
final Set<String> allDynamicProps = this.dynamicPropertyNames;
final Set<Relationship> newRelationships = new HashSet<>();
final String routeStrategy = configuredRouteStrategy;
if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
for (final String propName : allDynamicProps) {
newRelationships.add(new Relationship.Builder().name(propName).build());
}
} else {
newRelationships.add(REL_MATCH);
}
newRelationships.add(REL_ORIGINAL);
newRelationships.add(REL_NO_MATCH);
this.relationships.set(newRelationships);
}
/**
* When this processor is scheduled, update the dynamic properties into the map
* for quick access during each onTrigger call
*
* @param context ProcessContext used to retrieve dynamic properties
*/
@OnScheduled
public void onScheduled(final ProcessContext context) {
final String regex = context.getProperty(GROUPING_REGEX).getValue();
if (regex != null) {
groupingRegex = Pattern.compile(regex);
}
final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
continue;
}
getLogger().debug("Adding new dynamic property: {}", new Object[] {descriptor});
newPropertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
}
this.propertyMap = newPropertyMap;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
Collection<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
boolean dynamicProperty = false;
final String matchStrategy = validationContext.getProperty(MATCH_STRATEGY).getValue();
final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue);
final boolean requiresExpression = matchStrategy.equalsIgnoreCase(satisfiesExpression);
Validator validator = null;
if (compileRegex) {
validator = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true);
}
Map<PropertyDescriptor, String> allProperties = validationContext.getProperties();
for (final PropertyDescriptor descriptor : allProperties.keySet()) {
if (descriptor.isDynamic()) {
dynamicProperty = true;
final String propValue = validationContext.getProperty(descriptor).getValue();
if (compileRegex) {
ValidationResult validationResult = validator.validate(descriptor.getName(), propValue, validationContext);
if (validationResult != null) {
results.add(validationResult);
}
} else if (requiresExpression) {
try {
final ResultType resultType = validationContext.newExpressionLanguageCompiler().compile(propValue).getResultType();
if (resultType != ResultType.BOOLEAN) {
results.add(new ValidationResult.Builder().valid(false).input(propValue).subject(descriptor.getName())
.explanation("expression returns type of " + resultType.name() + " but is required to return a Boolean value").build());
}
} catch (final IllegalArgumentException iae) {
results.add(new ValidationResult.Builder().valid(false).input(propValue).subject(descriptor.getName())
.explanation("input is not a valid Expression Language expression").build());
}
}
}
}
if (!dynamicProperty) {
results.add(new ValidationResult.Builder().subject("Dynamic Properties")
.explanation("In order to route text there must be dynamic properties to match against").valid(false).build());
}
return results;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile originalFlowFile = session.get();
if (originalFlowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final boolean trim = context.getProperty(TRIM_WHITESPACE).asBoolean();
final String routeStrategy = context.getProperty(ROUTE_STRATEGY).getValue();
final String matchStrategy = context.getProperty(MATCH_STRATEGY).getValue();
final boolean ignoreCase = context.getProperty(IGNORE_CASE).asBoolean();
final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue);
final boolean usePropValue = matchStrategy.equals(satisfiesExpression);
// Build up a Map of Relationship to object, where the object is the
// thing that each line is compared against
final Map<Relationship, Object> propValueMap;
final Map<Relationship, PropertyValue> propMap = this.propertyMap;
if (usePropValue) {
// If we are using an Expression Language we want a Map where the value is the
// PropertyValue, so we can just use the 'propMap' - no need to copy it.
propValueMap = (Map) propMap;
} else {
propValueMap = new HashMap<>(propMap.size());
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) {
final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue();
propValueMap.put(entry.getKey(), compileRegex ? cachedCompiledPattern(value, ignoreCase) : value);
}
}
final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new HashMap<>();
final Pattern groupPattern = groupingRegex;
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, Integer.MAX_VALUE, 8192)) {
final Map<String, String> variables = new HashMap<>(2);
int lineCount = 0;
String line;
while ((line = demarcator.nextLine()) != null) {
final String matchLine;
if (trim) {
matchLine = line.trim();
} else {
// Always trim off the new-line and carriage return characters before evaluating the line.
// The NLKBufferedReader maintains these characters so that when we write the line out we can maintain
// these characters. However, we don't actually want to match against these characters.
final String lineWithoutEndings;
final int indexOfCR = line.indexOf("\r");
final int indexOfNL = line.indexOf("\n");
if (indexOfCR > 0 && indexOfNL > 0) {
lineWithoutEndings = line.substring(0, Math.min(indexOfCR, indexOfNL));
} else if (indexOfCR > 0) {
lineWithoutEndings = line.substring(0, indexOfCR);
} else if (indexOfNL > 0) {
lineWithoutEndings = line.substring(0, indexOfNL);
} else {
lineWithoutEndings = line;
}
matchLine = lineWithoutEndings;
}
variables.put("line", line);
variables.put("lineNo", String.valueOf(++lineCount));
int propertiesThatMatchedLine = 0;
for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) {
boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), matchStrategy, ignoreCase, originalFlowFile, variables);
if (lineMatchesProperty) {
propertiesThatMatchedLine++;
}
if (lineMatchesProperty && ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
// route each individual line to each Relationship that matches. This one matches.
final Relationship relationship = entry.getKey();
final Group group = getGroup(matchLine, groupPattern);
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group);
continue;
}
// break as soon as possible to avoid calculating things we don't need to calculate.
if (lineMatchesProperty && ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES.getValue().equals(routeStrategy)) {
break;
}
if (!lineMatchesProperty && ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH.getValue().equals(routeStrategy)) {
break;
}
}
final Relationship relationship;
if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy) && propertiesThatMatchedLine > 0) {
// Set relationship to null so that we do not append the line to each FlowFile again. #appendLine is called
// above within the loop, as the line may need to go to multiple different FlowFiles.
relationship = null;
} else if (ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES.getValue().equals(routeStrategy) && propertiesThatMatchedLine > 0) {
relationship = REL_MATCH;
} else if (ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH.getValue().equals(routeStrategy) && propertiesThatMatchedLine == propValueMap.size()) {
relationship = REL_MATCH;
} else {
relationship = REL_NO_MATCH;
}
if (relationship != null) {
final Group group = getGroup(matchLine, groupPattern);
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group);
}
}
}
}
});
for (final Map.Entry<Relationship, Map<Group, FlowFile>> entry : flowFileMap.entrySet()) {
final Relationship relationship = entry.getKey();
final Map<Group, FlowFile> groupToFlowFileMap = entry.getValue();
for (final Map.Entry<Group, FlowFile> flowFileEntry : groupToFlowFileMap.entrySet()) {
final Group group = flowFileEntry.getKey();
final FlowFile flowFile = flowFileEntry.getValue();
final Map<String, String> attributes = new HashMap<>(2);
attributes.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
attributes.put(GROUP_ATTRIBUTE_KEY, StringUtils.join(group.getCapturedValues(), ", "));
logger.info("Created {} from {}; routing to relationship {}", new Object[] {flowFile, originalFlowFile, relationship.getName()});
FlowFile updatedFlowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
session.transfer(updatedFlowFile, entry.getKey());
}
}
// now transfer the original flow file
FlowFile flowFile = originalFlowFile;
logger.info("Routing {} to {}", new Object[] {flowFile, REL_ORIGINAL});
session.getProvenanceReporter().route(originalFlowFile, REL_ORIGINAL);
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_ORIGINAL.getName());
session.transfer(flowFile, REL_ORIGINAL);
}
private Group getGroup(final String line, final Pattern groupPattern) {
if (groupPattern == null) {
return EMPTY_GROUP;
} else {
final Matcher matcher = groupPattern.matcher(line);
if (matcher.matches()) {
final List<String> capturingGroupValues = new ArrayList<>(matcher.groupCount());
for (int i = 1; i <= matcher.groupCount(); i++) {
capturingGroupValues.add(matcher.group(i));
}
return new Group(capturingGroupValues);
} else {
return EMPTY_GROUP;
}
}
}
private void appendLine(final ProcessSession session, final Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship relationship,
final FlowFile original, final String line, final Charset charset, final Group group) {
final Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.computeIfAbsent(relationship, k -> new HashMap<>());
FlowFile flowFile = groupToFlowFileMap.get(group);
if (flowFile == null) {
flowFile = session.create(original);
}
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(line.getBytes(charset));
}
});
groupToFlowFileMap.put(group, flowFile);
}
protected static boolean lineMatches(final String line, final Object comparison, final String matchingStrategy, final boolean ignoreCase,
final FlowFile flowFile, final Map<String, String> variables) {
switch (matchingStrategy) {
case startsWithValue:
if (ignoreCase) {
return line.toLowerCase().startsWith(((String) comparison).toLowerCase());
} else {
return line.startsWith((String) comparison);
}
case endsWithValue:
if (ignoreCase) {
return line.toLowerCase().endsWith(((String) comparison).toLowerCase());
} else {
return line.endsWith((String) comparison);
}
case containsValue:
if (ignoreCase) {
return line.toLowerCase().contains(((String) comparison).toLowerCase());
} else {
return line.contains((String) comparison);
}
case equalsValue:
if (ignoreCase) {
return line.equalsIgnoreCase((String) comparison);
} else {
return line.equals(comparison);
}
case matchesRegularExpressionValue:
return ((Pattern) comparison).matcher(line).matches();
case containsRegularExpressionValue:
return ((Pattern) comparison).matcher(line).find();
case satisfiesExpression: {
final PropertyValue booleanProperty = (PropertyValue) comparison;
return booleanProperty.evaluateAttributeExpressions(flowFile, variables).asBoolean();
}
}
return false;
}
private static class Group {
private final List<String> capturedValues;
public Group(final List<String> capturedValues) {
this.capturedValues = capturedValues;
}
public List<String> getCapturedValues() {
return capturedValues;
}
@Override
public String toString() {
return "Group" + capturedValues;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((capturedValues == null) ? 0 : capturedValues.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Group other = (Group) obj;
if (capturedValues == null) {
return other.capturedValues == null;
} else return capturedValues.equals(other.capturedValues);
}
}
}