blob: fd20b0c949574b91b15b10d2630701e8e20fa338 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import javax.xml.XMLConstants;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Templates;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"xml", "xslt", "transform"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Applies the provided XSLT file to the flowfile XML payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the XSL transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship")
@DynamicProperty(name = "An XSLT transform parameter name", value = "An XSLT transform parameter value",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "These XSLT parameters are passed to the transformer")
public class TransformXml extends AbstractProcessor {
public static final PropertyDescriptor XSLT_FILE_NAME = new PropertyDescriptor.Builder()
.name("XSLT file name")
.description("Provides the name (including full path) of the XSLT file to apply to the flowfile XML content."
+ "One of the 'XSLT file name' and 'XSLT Lookup' properties must be defined.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.build();
public static final PropertyDescriptor XSLT_CONTROLLER = new PropertyDescriptor.Builder()
.name("xslt-controller")
.displayName("XSLT Lookup")
.description("Controller lookup used to store XSLT definitions. One of the 'XSLT file name' and "
+ "'XSLT Lookup' properties must be defined. WARNING: note that the lookup controller service "
+ "should not be used to store large XSLT files.")
.required(false)
.identifiesControllerService(StringLookupService.class)
.build();
public static final PropertyDescriptor XSLT_CONTROLLER_KEY = new PropertyDescriptor.Builder()
.name("xslt-controller-key")
.displayName("XSLT Lookup key")
.description("Key used to retrieve the XSLT definition from the XSLT lookup controller. This property must be "
+ "set when using the XSLT controller property.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDENT_OUTPUT = new PropertyDescriptor.Builder()
.name("indent-output")
.displayName("Indent")
.description("Whether or not to indent the output.")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor SECURE_PROCESSING = new PropertyDescriptor.Builder()
.name("secure-processing")
.displayName("Secure processing")
.description("Whether or not to mitigate various XML-related attacks like XXE (XML External Entity) attacks.")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Cache size")
.description("Maximum number of stylesheets to cache. Zero disables the cache.")
.required(true)
.defaultValue("10")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CACHE_TTL_AFTER_LAST_ACCESS = new PropertyDescriptor.Builder()
.name("cache-ttl-after-last-access")
.displayName("Cache TTL after last access")
.description("The cache TTL (time-to-live) or how long to keep stylesheets in the cache after last access.")
.required(true)
.defaultValue("60 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile with transformed content will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid XML), it will be routed to this relationship")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private LoadingCache<String, Templates> cache;
private static AtomicReference<LookupService<String>> lookupService = new AtomicReference<LookupService<String>>(null);
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(XSLT_FILE_NAME);
properties.add(XSLT_CONTROLLER);
properties.add(XSLT_CONTROLLER_KEY);
properties.add(INDENT_OUTPUT);
properties.add(SECURE_PROCESSING);
properties.add(CACHE_SIZE);
properties.add(CACHE_TTL_AFTER_LAST_ACCESS);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
PropertyValue filename = validationContext.getProperty(XSLT_FILE_NAME);
PropertyValue controller = validationContext.getProperty(XSLT_CONTROLLER);
PropertyValue key = validationContext.getProperty(XSLT_CONTROLLER_KEY);
if((filename.isSet() && controller.isSet())
|| (!filename.isSet() && !controller.isSet())) {
results.add(new ValidationResult.Builder()
.valid(false)
.subject(this.getClass().getSimpleName())
.explanation("Exactly one of the \"XSLT file name\" and \"XSLT controller\" properties must be defined.")
.build());
}
if(controller.isSet() && !key.isSet()) {
results.add(new ValidationResult.Builder()
.valid(false)
.subject(XSLT_CONTROLLER_KEY.getDisplayName())
.explanation("If using \"XSLT controller\", the XSLT controller key property must be defined.")
.build());
}
if(controller.isSet()) {
final LookupService<String> lookupService = validationContext.getProperty(XSLT_CONTROLLER).asControllerService(StringLookupService.class);
final Set<String> requiredKeys = lookupService.getRequiredKeys();
if (requiredKeys == null || requiredKeys.size() != 1) {
results.add(new ValidationResult.Builder()
.valid(false)
.subject(XSLT_CONTROLLER.getDisplayName())
.explanation("This processor requires a key-value lookup service supporting exactly one required key, was: " +
(requiredKeys == null ? "null" : String.valueOf(requiredKeys.size())))
.build());
}
}
return results;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.required(false)
.dynamic(true)
.build();
}
private Templates newTemplates(final ProcessContext context, final String path) throws TransformerConfigurationException, LookupFailureException {
final Boolean secureProcessing = context.getProperty(SECURE_PROCESSING).asBoolean();
TransformerFactory factory = TransformerFactory.newInstance();
final boolean isFilename = context.getProperty(XSLT_FILE_NAME).isSet();
if (secureProcessing) {
factory.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
// don't be overly DTD-unfriendly forcing http://apache.org/xml/features/disallow-doctype-decl
factory.setFeature("http://saxon.sf.net/feature/parserFeature?uri=http://xml.org/sax/features/external-parameter-entities", false);
factory.setFeature("http://saxon.sf.net/feature/parserFeature?uri=http://xml.org/sax/features/external-general-entities", false);
}
if(isFilename) {
return factory.newTemplates(new StreamSource(path));
} else {
final String coordinateKey = lookupService.get().getRequiredKeys().iterator().next();
final Optional<String> attributeValue = lookupService.get().lookup(Collections.singletonMap(coordinateKey, path));
if (attributeValue.isPresent() && StringUtils.isNotBlank(attributeValue.get())) {
return factory.newTemplates(new StreamSource(new ByteArrayInputStream(attributeValue.get().getBytes(StandardCharsets.UTF_8))));
} else {
throw new TransformerConfigurationException("No XSLT definition is associated to " + path + " in the lookup controller service.");
}
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final ComponentLog logger = getLogger();
final Integer cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final Long cacheTTL = context.getProperty(CACHE_TTL_AFTER_LAST_ACCESS).asTimePeriod(TimeUnit.SECONDS);
if (cacheSize > 0) {
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder().maximumSize(cacheSize);
if (cacheTTL > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(cacheTTL, TimeUnit.SECONDS);
}
cache = cacheBuilder.build(
new CacheLoader<String, Templates>() {
@Override
public Templates load(String path) throws TransformerConfigurationException, LookupFailureException {
return newTemplates(context, path);
}
});
} else {
cache = null;
logger.info("Stylesheet cache disabled because cache size is set to 0");
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile original = session.get();
if (original == null) {
return;
}
final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
final String path = context.getProperty(XSLT_FILE_NAME).isSet()
? context.getProperty(XSLT_FILE_NAME).evaluateAttributeExpressions(original).getValue()
: context.getProperty(XSLT_CONTROLLER_KEY).evaluateAttributeExpressions(original).getValue();
final Boolean indentOutput = context.getProperty(INDENT_OUTPUT).asBoolean();
lookupService.set(context.getProperty(XSLT_CONTROLLER).asControllerService(LookupService.class));
try {
FlowFile transformed = session.write(original, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream out) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final Templates templates;
if (cache != null) {
templates = cache.get(path);
} else {
templates = newTemplates(context, path);
}
final Transformer transformer = templates.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, (indentOutput ? "yes" : "no"));
// pass all dynamic properties to the transformer
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
String value = context.newPropertyValue(entry.getValue()).evaluateAttributeExpressions(original).getValue();
transformer.setParameter(entry.getKey().getName(), value);
}
}
// use a StreamSource with Saxon
StreamSource source = new StreamSource(in);
StreamResult result = new StreamResult(out);
transformer.transform(source, result);
} catch (final Exception e) {
throw new IOException(e);
}
}
});
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
} catch (ProcessException e) {
logger.error("Unable to transform {} due to {}", new Object[]{original, e});
session.transfer(original, REL_FAILURE);
}
}
@SuppressWarnings("unused")
private static final class XsltValidator implements Validator {
private volatile Tuple<String, ValidationResult> cachedResult;
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
final Tuple<String, ValidationResult> lastResult = this.cachedResult;
if (lastResult != null && lastResult.getKey().equals(input)) {
return lastResult.getValue();
} else {
String error = null;
final File stylesheet = new File(input);
final TransformerFactory tFactory = new net.sf.saxon.TransformerFactoryImpl();
final StreamSource styleSource = new StreamSource(stylesheet);
try {
tFactory.newTransformer(styleSource);
} catch (final Exception e) {
error = e.toString();
}
this.cachedResult = new Tuple<>(input, new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(error == null)
.explanation(error)
.build());
return this.cachedResult.getValue();
}
}
}
}