blob: 7d5eae4d84b26bba13415499224c7f73090d0896 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
import org.apache.nifi.attribute.expression.language.exception.IllegalAttributeException;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.LineDemarcator;
import org.apache.nifi.util.StopWatch;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex"})
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression (regex) against it and replacing the section of "
+ "the content that matches the Regular Expression with some alternate value.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class ReplaceText extends AbstractProcessor {
private static Pattern REPLACEMENT_NORMALIZATION_PATTERN = Pattern.compile("(\\$\\D)");
// Constants
public static final String LINE_BY_LINE = "Line-by-Line";
public static final String ALL = "All";
public static final String FIRST_LINE = "First-Line";
public static final String EXCEPT_FIRST_LINE = "Except-First-Line";
public static final String LAST_LINE = "Last-Line";
public static final String EXCEPT_LAST_LINE = "Except-Last-Line";
public static final String ENTIRE_TEXT = "Entire text";
public static final String prependValue = "Prepend";
public static final String appendValue = "Append";
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
public static final String alwaysReplace = "Always Replace";
public static final String SUBSTITUTE_VARIABLES_VALUE = "Substitute Variables";
private static final Pattern unescapedBackReferencePattern = Pattern.compile("[^\\\\]\\$(\\d+)");
private static final String DEFAULT_REGEX = "(?s)(^.*$)";
private static final String DEFAULT_REPLACEMENT_VALUE = "$1";
// Prepend and Append will just insert the replacement value at the beginning or end
// Properties PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE
static final AllowableValue PREPEND = new AllowableValue(prependValue, prependValue,
"Insert the Replacement Value at the beginning of the FlowFile or the beginning of each line (depending on the Evaluation Mode). For \"Line-by-Line\" Evaluation Mode, "
+ "the value will be prepended to each line. Similarly, for \"First-Line\", \"Last-Line\", \"Except-Last-Line\" and \"Except-First-Line\" Evaluation Modes,"
+ "the value will be prepended to header alone, footer alone, all lines except header and all lines except footer respectively. For \"Entire Text\" evaluation mode,"
+ "the value will be prepended to the entire text.");
static final AllowableValue APPEND = new AllowableValue(appendValue, appendValue,
"Insert the Replacement Value at the end of the FlowFile or the end of each line (depending on the Evaluation Mode). For \"Line-by-Line\" Evaluation Mode, "
+ "the value will be appended to each line. Similarly, for \"First-Line\", \"Last-Line\", \"Except-Last-Line\" and \"Except-First-Line\" Evaluation Modes,"
+ "the value will be appended to header alone, footer alone, all lines except header and all lines except footer respectively. For \"Entire Text\" evaluation mode,"
+ "the value will be appended to the entire text.");
static final AllowableValue LITERAL_REPLACE = new AllowableValue(literalReplaceValue, literalReplaceValue,
"Search for all instances of the Search Value and replace the matches with the Replacement Value.");
static final AllowableValue REGEX_REPLACE = new AllowableValue(regexReplaceValue, regexReplaceValue,
"Interpret the Search Value as a Regular Expression and replace all matches with the Replacement Value. The Replacement Value may reference Capturing Groups used "
+ "in the Search Value by using a dollar-sign followed by the Capturing Group number, such as $1 or $2. If the Search Value is set to .* then everything is replaced without "
+ "even evaluating the Regular Expression.");
static final AllowableValue ALWAYS_REPLACE = new AllowableValue(alwaysReplace, alwaysReplace,
"Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the <Evaluation Mode> property) and does not bother searching "
+ "for any value. When this strategy is chosen, the <Search Value> property is ignored.");
static final AllowableValue SUBSTITUTE_VARIABLES = new AllowableValue(SUBSTITUTE_VARIABLES_VALUE, SUBSTITUTE_VARIABLES_VALUE,
"Substitute variable references (specified in ${var} form) using FlowFile attributes for looking up the replacement value by variable name. "
+ "When this strategy is chosen, both the <Search Value> and <Replacement Value> properties are ignored.");
public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
.name("Regular Expression")
.displayName("Search Value")
.description("The Search Value to search for in the FlowFile content. Only used for 'Literal Replace' and 'Regex Replace' matching strategies")
.required(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue(DEFAULT_REGEX)
.build();
public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
.name("Replacement Value")
.description("The value to insert using the 'Replacement Strategy'. Using \"Regex Replace\" back-references to Regular Expression capturing groups "
+ "are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value. "
+ "Back References may also be referenced using the Expression Language, as '$1', '$2', etc. The single-tick marks MUST be included, as these variables are "
+ "not \"Standard\" attribute names (attribute names must be quoted unless they contain only numbers, letters, and _).")
.required(true)
.defaultValue(DEFAULT_REPLACEMENT_VALUE)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the file is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to "
+ "apply the replacement. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, "
+ "the FlowFile will be routed to 'failure'. "
+ "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value "
+ "of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. "
+ "This value is ignored if the <Replacement Strategy> property is set to one of: Append, Prepend, Always Replace")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Strategy")
.description("The strategy for how and what to replace within the FlowFile's text content.")
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
.defaultValue(REGEX_REPLACE.getValue())
.required(true)
.build();
public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
.name("Evaluation Mode")
.description("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or buffer the entire file "
+ "into memory (Entire Text) and run against that.")
.allowableValues(LINE_BY_LINE, ENTIRE_TEXT)
.defaultValue(LINE_BY_LINE)
.required(true)
.build();
public static final PropertyDescriptor LINE_BY_LINE_EVALUATION_MODE = new PropertyDescriptor.Builder()
.name("Line-by-Line Evaluation Mode")
.description("Run the 'Replacement Strategy' against each line separately (Line-by-Line) for all lines in the FlowFile, First Line (Header) alone, "
+ "Last Line (Footer) alone, Except the First Line (Header) or Except the Last Line (Footer).")
.allowableValues(ALL, FIRST_LINE, LAST_LINE, EXCEPT_FIRST_LINE, EXCEPT_LAST_LINE)
.defaultValue(ALL)
.required(false)
.build();
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that have been successfully processed are routed to this relationship. This includes both FlowFiles that had text"
+ " replaced and those that did not.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that could not be updated are routed to this relationship")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private ReplacementStrategyExecutor replacementStrategyExecutor;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SEARCH_VALUE);
properties.add(REPLACEMENT_VALUE);
properties.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE);
properties.add(REPLACEMENT_STRATEGY);
properties.add(EVALUATION_MODE);
properties.add(LINE_BY_LINE_EVALUATION_MODE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(validationContext));
switch (validationContext.getProperty(REPLACEMENT_STRATEGY).getValue()) {
case literalReplaceValue:
errors.add(StandardValidators.NON_EMPTY_VALIDATOR
.validate(SEARCH_VALUE.getName(), validationContext.getProperty(SEARCH_VALUE).getValue(), validationContext));
break;
case regexReplaceValue:
errors.add(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)
.validate(SEARCH_VALUE.getName(), validationContext.getProperty(SEARCH_VALUE).getValue(), validationContext));
break;
case appendValue:
case prependValue:
case alwaysReplace:
case SUBSTITUTE_VARIABLES_VALUE:
default:
// nothing to check, search value is not used
break;
}
return errors;
}
@OnScheduled
public void setup(ProcessContext context) {
final String replacementStrategy = context.getProperty(REPLACEMENT_STRATEGY).getValue();
final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
switch (replacementStrategy) {
case prependValue:
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
replacementStrategyExecutor = new AppendReplace();
break;
case regexReplaceValue:
// for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
if (context.getProperty(SEARCH_VALUE).getValue().equals(".*")) {
replacementStrategyExecutor = new AlwaysReplace();
} else if (context.getProperty(SEARCH_VALUE).getValue().equals(DEFAULT_REGEX)
&& evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)
&& context.getProperty(REPLACEMENT_VALUE).getValue().isEmpty()) {
replacementStrategyExecutor = new AlwaysReplace();
} else {
final String regex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions().getValue();
replacementStrategyExecutor = new RegexReplace(regex);
}
break;
case literalReplaceValue:
replacementStrategyExecutor = new LiteralReplace();
break;
case alwaysReplace:
replacementStrategyExecutor = new AlwaysReplace();
break;
case SUBSTITUTE_VARIABLES_VALUE:
replacementStrategyExecutor = new SubstituteVariablesReplace();
break;
default:
throw new AssertionError();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) {
logger.warn("Transferred {} to 'faliure' because it was larger than the buffer size");
session.transfer(flowFile, REL_FAILURE);
return;
}
}
final StopWatch stopWatch = new StopWatch(true);
try {
flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize);
} catch (StackOverflowError e) {
// Some regular expressions can produce many matches on large input data size using recursive code
// do not log the StackOverflowError stack trace
logger.info("Transferred {} to 'failure' due to {}", new Object[] { flowFile, e.toString() });
session.transfer(flowFile, REL_FAILURE);
return;
} catch (IllegalAttributeException | AttributeExpressionLanguageException e) {
logger.warn("Transferred {} to 'failure' due to {}", new Object[] { flowFile, e.toString() }, e);
session.transfer(flowFile, REL_FAILURE);
return;
}
logger.info("Transferred {} to 'success'", new Object[] {flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
}
// If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing
// groups and the Replacement Value has the value is "I owe $8 to him", then we want to treat the $8 as a literal "$8", rather
// than attempting to use it as a back reference.
private static String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) {
if (numCapturingGroups == 0) {
return unescaped;
}
String value = unescaped;
final Matcher backRefMatcher = unescapedBackReferencePattern.matcher(value); // consider unescaped back references
while (backRefMatcher.find()) {
final String backRefNum = backRefMatcher.group(1);
if (backRefNum.startsWith("0")) {
continue;
}
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.
while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
backRefIndex /= 10;
}
if (backRefIndex > numCapturingGroups) {
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(value, 0, groupStart - 1);
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
}
}
return value;
}
private static class AlwaysReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder lineEndingBuilder = new StringBuilder(2);
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, out -> out.write(replacementValue.getBytes(charset)));
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
((bw, oneLine) -> {
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = oneLine.length() - 1; i >= 0; i--) {
final char c = oneLine.charAt(i);
if (c == '\r' || c == '\n') {
lineEndingBuilder.append(c);
} else {
break;
}
}
bw.write(replacementValue);
// Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
// So if builder has multiple characters, they are now reversed from the original string's ordering.
bw.write(lineEndingBuilder.reverse().toString());
})));
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
}
private static class PrependReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
out.write(replacementValue.getBytes(charset));
IOUtils.copy(in, out);
}
});
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> bw.write(replacementValue.concat(oneLine))));
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
}
private static class AppendReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
IOUtils.copy(in, out);
out.write(replacementValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
// that to insert the replacement value.
boolean foundNewLine = false;
for (int i = 0; i < oneLine.length(); i++) {
final char c = oneLine.charAt(i);
if (foundNewLine) {
bw.write(c);
continue;
}
if (c == '\r' || c == '\n') {
bw.write(replacementValue);
foundNewLine = true;
}
bw.write(c);
}
if (!foundNewLine) {
bw.write(replacementValue);
}
}));
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return false;
}
}
private static class RegexReplace implements ReplacementStrategyExecutor {
private final int numCapturingGroups;
// back references are not supported in the evaluated expression
private final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
// when we encounter a '$[0-9+]' replace it with '\$[0-9+]'
return attributeValue.replaceAll("(\\$\\d+?)", "\\\\$1");
}
};
public RegexReplace(final String regex) {
numCapturingGroups = Pattern.compile(regex).matcher("").groupCount();
}
@Override
public FlowFile replace(final FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final AttributeValueDecorator quotedAttributeDecorator = Pattern::quote;
final String searchRegex = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final Pattern searchPattern = Pattern.compile(searchRegex);
final Map<String, String> additionalAttrs = new HashMap<>(numCapturingGroups);
FlowFile updatedFlowFile;
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
final int flowFileSize = (int) flowFile.getSize();
final int bufferSize = Math.min(maxBufferSize, flowFileSize);
final byte[] buffer = new byte[bufferSize];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, false));
final String contentString = new String(buffer, 0, flowFileSize, charset);
final Matcher matcher = searchPattern.matcher(contentString);
final PropertyValue replacementValueProperty = context.getProperty(REPLACEMENT_VALUE);
int matches = 0;
final StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matches++;
for (int i=0; i <= matcher.groupCount(); i++) {
additionalAttrs.put("$" + i, matcher.group(i));
}
String replacement = replacementValueProperty.evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
String replacementFinal = normalizeReplacementString(replacement);
matcher.appendReplacement(sb, replacementFinal);
}
if (matches > 0) {
matcher.appendTail(sb);
final String updatedValue = sb.toString();
updatedFlowFile = session.write(flowFile, out -> out.write(updatedValue.getBytes(charset)));
} else {
return flowFile;
}
} else {
final Matcher matcher = searchPattern.matcher("");
updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
matcher.reset(oneLine);
int matches = 0;
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matches++;
for (int i=0; i <= matcher.groupCount(); i++) {
additionalAttrs.put("$" + i, matcher.group(i));
}
String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
String replacementFinal = normalizeReplacementString(replacement);
matcher.appendReplacement(sb, replacementFinal);
}
if (matches > 0) {
matcher.appendTail(sb);
final String updatedValue = sb.toString();
bw.write(updatedValue);
} else {
// No match. Just write out the line as it was.
bw.write(oneLine);
}
}));
}
return updatedFlowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
}
private static class LiteralReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
final int flowFileSize = (int) flowFile.getSize();
final int bufferSize = Math.min(maxBufferSize, flowFileSize);
final byte[] buffer = new byte[bufferSize];
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
final String contentString = new String(buffer, 0, flowFileSize, charset);
// Interpreting the search and replacement values as char sequences
final String updatedValue = contentString.replace(searchValue, replacementValue);
out.write(updatedValue.getBytes(charset));
}
});
} else {
final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL);
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
int matches = 0;
int lastEnd = 0;
final Matcher matcher = searchPattern.matcher(oneLine);
while (matcher.find()) {
bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
bw.write(replacementValue);
matches++;
lastEnd = matcher.end();
}
if (matches > 0) {
bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
} else {
bw.write(oneLine);
}
}));
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
}
private static class SubstituteVariablesReplace implements ReplacementStrategyExecutor {
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final Map<String, String> flowFileAttributes = flowFile.getAttributes();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
final int flowFileSize = (int) flowFile.getSize();
final int bufferSize = Math.min(maxBufferSize, flowFileSize);
final byte[] buffer = new byte[bufferSize];
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
StreamUtils.fillBuffer(in, buffer, false);
final String originalContent = new String(buffer, 0, flowFileSize, charset);
final String substitutedContent = StringSubstitutor.replace(originalContent, flowFileAttributes);
out.write(substitutedContent.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
final String substitutedLine = StringSubstitutor.replace(oneLine, flowFileAttributes);
bw.write(substitutedLine);
}));
}
return flowFile;
}
@Override
public boolean isAllDataBufferedForEntireText() {
return true;
}
}
/**
* If we have a '$' followed by anything other than a number, then escape
* it. E.g., '$d' becomes '\$d' so that it can be used as a literal in a
* regex.
*/
private static String normalizeReplacementString(String replacement) {
String replacementFinal = replacement;
if (REPLACEMENT_NORMALIZATION_PATTERN.matcher(replacement).find()) {
replacementFinal = Matcher.quoteReplacement(replacement);
}
return replacementFinal;
}
private interface ReplacementStrategyExecutor {
FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize);
boolean isAllDataBufferedForEntireText();
}
@FunctionalInterface
private interface ReplaceLine {
void apply(BufferedWriter bw, String oneLine) throws IOException;
}
private static class StreamReplaceCallback implements StreamCallback {
private final Charset charset;
private final int maxBufferSize;
private final String lineByLineEvaluationMode;
private final ReplaceLine replaceLine;
private StreamReplaceCallback(Charset charset,
int maxBufferSize,
String lineByLineEvaluationMode,
ReplaceLine replaceLine) {
this.charset = charset;
this.maxBufferSize = maxBufferSize;
this.lineByLineEvaluationMode = lineByLineEvaluationMode;
this.replaceLine = replaceLine;
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String precedingLine = demarcator.nextLine();
String succeedingLine;
boolean firstLine = true;
while (null != (succeedingLine = demarcator.nextLine())) {
if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE)){
replaceLine.apply(bw, precedingLine);
firstLine = false;
} else if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE)) {
firstLine = false;
bw.write(precedingLine);
} else if(lineByLineEvaluationMode.equalsIgnoreCase(LINE_BY_LINE)
|| lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE)
|| lineByLineEvaluationMode.equalsIgnoreCase(ALL)
|| (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
replaceLine.apply(bw, precedingLine);
} else {
bw.write(precedingLine);
}
precedingLine = succeedingLine;
}
// 0 byte empty FlowFIles are left untouched
if(null != precedingLine) {
if (lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE)
|| (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE))
|| (firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
bw.write(precedingLine);
} else {
replaceLine.apply(bw, precedingLine);
}
}
}
}
}
}