blob: 45a425a2f779289c7bdc48d26e325db1e34a63fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.AttributeValueDecorator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex", "Mapping"})
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that "
+ "matches the Regular Expression with some alternate value provided in a mapping file.")
public class ReplaceTextWithMapping extends AbstractProcessor {
public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder()
.name("Regular Expression")
.description("The Regular Expression to search for in the FlowFile content")
.required(true)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("\\S+")
.build();
public static final PropertyDescriptor MATCHING_GROUP_FOR_LOOKUP_KEY = new PropertyDescriptor.Builder()
.name("Matching Group")
.description("The number of the matching group of the provided regex to replace with the corresponding value from the mapping file (if it exists).")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("0")
.build();
public static final PropertyDescriptor MAPPING_FILE = new PropertyDescriptor.Builder()
.name("Mapping File")
.description("The name of the file (including the full path) containing the Mappings.")
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.required(true)
.build();
public static final PropertyDescriptor MAPPING_FILE_REFRESH_INTERVAL = new PropertyDescriptor.Builder()
.name("Mapping File Refresh Interval")
.description("The polling interval in seconds to check for updates to the mapping file. The default is 60s.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.defaultValue("60s")
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the file is encoded")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Buffer Size")
.description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. If a FlowFile is larger "
+ "than this value, the FlowFile will be routed to 'failure'")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that could not be updated are routed to this relationship")
.build();
private final Pattern backReferencePattern = Pattern.compile("[^\\\\]\\$(\\d+)");
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final ReentrantLock processorLock = new ReentrantLock();
private final AtomicLong lastModified = new AtomicLong(0L);
final AtomicLong mappingTestTime = new AtomicLong(0);
private final AtomicReference<ConfigurationState> configurationStateRef = new AtomicReference<>(new ConfigurationState(null));
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
final int groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
if (groupToMatch > numCapturingGroups) {
errors.add(
new ValidationResult.Builder()
.subject("Insufficient Matching Groups")
.valid(false)
.explanation("The specified matching group does not exist for the regular expression provided")
.build());
}
return errors;
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(REGEX);
properties.add(MATCHING_GROUP_FOR_LOOKUP_KEY);
properties.add(MAPPING_FILE);
properties.add(MAPPING_FILE_REFRESH_INTERVAL);
properties.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
updateMapping(context);
final List<FlowFile> flowFiles = session.get(5);
if (flowFiles.isEmpty()) {
return;
}
final ComponentLog logger = getLogger();
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
for (FlowFile flowFile : flowFiles) {
if (flowFile.getSize() > maxBufferSize) {
session.transfer(flowFile, REL_FAILURE);
continue;
}
final StopWatch stopWatch = new StopWatch(true);
flowFile = session.write(flowFile, new ReplaceTextCallback(context, flowFile, maxBufferSize));
logger.info("Transferred {} to 'success'", new Object[]{flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
}
}
protected String fillReplacementValueBackReferences(String rawReplacementValue, int numCapturingGroups) {
String replacement = rawReplacementValue;
final Matcher backRefMatcher = backReferencePattern.matcher(replacement);
int replacementCount = 0;
while (backRefMatcher.find()) {
final int backRefIndex = Integer.parseInt(backRefMatcher.group(1));
if (backRefIndex > numCapturingGroups || backRefIndex < 0) {
final StringBuilder sb = new StringBuilder(replacement.length() + 1);
final int groupStart = backRefMatcher.start(1) + replacementCount++;
sb.append(replacement.substring(0, groupStart - 1));
sb.append("\\");
sb.append(replacement.substring(groupStart - 1));
replacement = sb.toString();
}
}
replacement = replacement.replaceAll("(\\$\\D)", "\\\\$1");
return replacement;
}
private void updateMapping(final ProcessContext context) {
if (processorLock.tryLock()) {
final ComponentLog logger = getLogger();
try {
// if not queried mapping file lastUpdate time in
// mapppingRefreshPeriodSecs, do so.
long currentTimeSecs = System.currentTimeMillis() / 1000;
long mappingRefreshPeriodSecs = context.getProperty(MAPPING_FILE_REFRESH_INTERVAL).asTimePeriod(TimeUnit.SECONDS);
boolean retry = (currentTimeSecs > (mappingTestTime.get() + mappingRefreshPeriodSecs));
if (retry) {
mappingTestTime.set(System.currentTimeMillis() / 1000);
// see if the mapping file needs to be reloaded
final String fileName = context.getProperty(MAPPING_FILE).getValue();
final File file = new File(fileName);
if (file.exists() && file.isFile() && file.canRead()) {
if (file.lastModified() > lastModified.get()) {
lastModified.getAndSet(file.lastModified());
try (FileInputStream is = new FileInputStream(file)) {
logger.info("Reloading mapping file: {}", new Object[]{fileName});
final Map<String, String> mapping = loadMappingFile(is);
final ConfigurationState newState = new ConfigurationState(mapping);
configurationStateRef.set(newState);
} catch (IOException e) {
logger.error("Error reading mapping file: {}", new Object[]{e.getMessage()});
}
}
} else {
logger.error("Mapping file does not exist or is not readable: {}", new Object[]{fileName});
}
}
} catch (Exception e) {
logger.error("Error loading mapping file: {}", new Object[]{e.getMessage()});
} finally {
processorLock.unlock();
}
}
}
protected Map<String, String> loadMappingFile(InputStream is) throws IOException {
Map<String, String> mapping = new HashMap<>();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = null;
while ((line = reader.readLine()) != null) {
final String[] splits = StringUtils.split(line, "\t ", 2);
if (splits.length == 1) {
mapping.put(splits[0].trim(), ""); // support key with empty value
} else if (splits.length == 2) {
final String key = splits[0].trim();
final String value = splits[1].trim();
mapping.put(key, value);
}
}
return mapping;
}
public static class ConfigurationState {
final Map<String, String> mapping = new HashMap<>();
public ConfigurationState(final Map<String, String> mapping) {
if (mapping != null) {
this.mapping.putAll(mapping);
}
}
public Map<String, String> getMapping() {
return Collections.unmodifiableMap(mapping);
}
public boolean isConfigured() {
return !mapping.isEmpty();
}
}
private final class ReplaceTextCallback implements StreamCallback {
private final Charset charset;
private final byte[] buffer;
private final String regex;
private final FlowFile flowFile;
private final int numCapturingGroups;
private final int groupToMatch;
private final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return Pattern.quote(attributeValue);
}
};
private ReplaceTextCallback(ProcessContext context, FlowFile flowFile, int maxBufferSize) {
this.regex = context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
this.flowFile = flowFile;
this.charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
this.numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
this.buffer = new byte[maxBufferSize];
this.groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
final Map<String, String> mapping = configurationStateRef.get().getMapping();
StreamUtils.fillBuffer(in, buffer, false);
final int flowFileSize = (int) flowFile.getSize();
final String contentString = new String(buffer, 0, flowFileSize, charset);
final Matcher matcher = Pattern.compile(regex).matcher(contentString);
matcher.reset();
boolean result = matcher.find();
if (result) {
StringBuffer sb = new StringBuffer();
do {
String matched = matcher.group(groupToMatch);
String rv = mapping.get(matched);
if (rv == null) {
String replacement = matcher.group().replace("$", "\\$");
matcher.appendReplacement(sb, replacement);
} else {
String allRegexMatched = matcher.group(); //this is everything that matched the regex
int scaledStart = matcher.start(groupToMatch) - matcher.start();
int scaledEnd = scaledStart + matcher.group(groupToMatch).length();
StringBuilder replacementBuilder = new StringBuilder();
replacementBuilder.append(allRegexMatched.substring(0, scaledStart).replace("$", "\\$"));
replacementBuilder.append(fillReplacementValueBackReferences(rv, numCapturingGroups));
replacementBuilder.append(allRegexMatched.substring(scaledEnd).replace("$", "\\$"));
matcher.appendReplacement(sb, replacementBuilder.toString());
}
result = matcher.find();
} while (result);
matcher.appendTail(sb);
out.write(sb.toString().getBytes(charset));
return;
}
out.write(contentString.getBytes(charset));
}
}
}