blob: 7c7d64990797e310f2ecba3d12d1a6a3179a6302 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.JsonUtils;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.jolt.TransformFactory;
import org.apache.nifi.processors.standard.util.jolt.TransformUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr","cardinality","sort"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute = "mime.type",description = "Always set to application/json")
@CapabilityDescription("Applies a list of Jolt specifications to the flowfile JSON payload. A new FlowFile is created "
+ "with transformed content and is routed to the 'success' relationship. If the JSON transform "
+ "fails, the original FlowFile is routed to the 'failure' relationship.")
public class JoltTransformJSON extends AbstractProcessor {
public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift", "Shift input JSON/data to create the output JSON.");
public static final AllowableValue CHAINR = new AllowableValue("jolt-transform-chain", "Chain", "Execute list of Jolt transformations.");
public static final AllowableValue DEFAULTR = new AllowableValue("jolt-transform-default", "Default", " Apply default values to the output JSON.");
public static final AllowableValue REMOVR = new AllowableValue("jolt-transform-remove", "Remove", " Remove values from input data to create the output JSON.");
public static final AllowableValue CARDINALITY = new AllowableValue("jolt-transform-card", "Cardinality", "Change the cardinality of input elements to create the output JSON.");
public static final AllowableValue SORTR = new AllowableValue("jolt-transform-sort", "Sort", "Sort input json key values alphabetically. Any specification set is ignored.");
public static final AllowableValue CUSTOMR = new AllowableValue("jolt-transform-custom", "Custom", "Custom Transformation. Requires Custom Transformation Class Name");
public static final AllowableValue MODIFIER_DEFAULTR = new AllowableValue("jolt-transform-modify-default", "Modify - Default", "Writes when key is missing or value is null");
public static final AllowableValue MODIFIER_OVERWRITER = new AllowableValue("jolt-transform-modify-overwrite", "Modify - Overwrite", " Always overwrite value");
public static final AllowableValue MODIFIER_DEFINER = new AllowableValue("jolt-transform-modify-define", "Modify - Define", "Writes when key is missing");
public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder()
.name("jolt-transform")
.displayName("Jolt Transformation DSL")
.description("Specifies the Jolt Transformation that should be used with the provided specification.")
.required(true)
.allowableValues(CARDINALITY, CHAINR, DEFAULTR, MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR, SORTR, CUSTOMR)
.defaultValue(CHAINR.getValue())
.build();
public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder()
.name("jolt-spec")
.displayName("Jolt Specification")
.description("Jolt Specification for transform of JSON data. This value is ignored if the Jolt Sort Transformation is selected.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor CUSTOM_CLASS = new PropertyDescriptor.Builder()
.name("jolt-custom-class")
.displayName("Custom Transformation Class Name")
.description("Fully Qualified Class Name for Custom Transformation")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
.name("jolt-custom-modules")
.displayName("Custom Module Directory")
.description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).")
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Transform Cache Size")
.description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need "
+ "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.required(true)
.build();
public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
.name("pretty_print")
.displayName("Pretty Print")
.description("Apply pretty print formatting to the output of the Jolt transform")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile with transformed content will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship")
.build();
private final static List<PropertyDescriptor> properties;
private final static Set<Relationship> relationships;
private volatile ClassLoader customClassLoader;
private final static String DEFAULT_CHARSET = "UTF-8";
/**
* It is a cache for transform objects. It keep values indexed by jolt specification string.
* For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform
* when there is no jolt-record-spec specified).
*/
private LoadingCache<Optional<String>, JoltTransform> transformCache;
static {
final List<PropertyDescriptor> _properties = new ArrayList<>();
_properties.add(JOLT_TRANSFORM);
_properties.add(CUSTOM_CLASS);
_properties.add(MODULES);
_properties.add(JOLT_SPEC);
_properties.add(TRANSFORM_CACHE_SIZE);
_properties.add(PRETTY_PRINT);
properties = Collections.unmodifiableList(_properties);
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue();
final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue();
final String modulePath = validationContext.getProperty(MODULES).isSet()? validationContext.getProperty(MODULES).getValue() : null;
if(!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())){
if(!SORTR.getValue().equals(transform)) {
final String message = "A specification is required for this transformation";
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
.build());
}
} else {
final ClassLoader customClassLoader;
try {
if (modulePath != null) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(modulePath, this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
}
final String specValue = validationContext.getProperty(JOLT_SPEC).getValue();
if (validationContext.isExpressionLanguagePresent(specValue)) {
final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue,true);
if (!StringUtils.isEmpty(invalidExpressionMsg)) {
results.add(new ValidationResult.Builder().valid(false)
.subject(JOLT_SPEC.getDisplayName())
.explanation("Invalid Expression Language: " + invalidExpressionMsg)
.build());
}
} else {
//for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet
Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{","\\\\\\\\\\$\\{"), DEFAULT_CHARSET);
if (CUSTOMR.getValue().equals(transform)) {
if (StringUtils.isEmpty(customTransform)) {
final String customMessage = "A custom transformation class should be provided. ";
results.add(new ValidationResult.Builder().valid(false)
.explanation(customMessage)
.build());
} else {
TransformFactory.getCustomTransform(customClassLoader, customTransform, specJson);
}
} else {
TransformFactory.getTransform(customClassLoader, transform, specJson);
}
}
} catch (final Exception e) {
getLogger().info("Processor is not valid - " + e.toString());
String message = "Specification not valid for the selected transformation." ;
results.add(new ValidationResult.Builder().valid(false)
.explanation(message)
.build());
}
}
return results;
}
@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
final Object inputJson;
try (final InputStream in = session.read(original)) {
inputJson = JsonUtils.jsonToObject(in);
} catch (final Exception e) {
logger.error("Failed to transform {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
return;
}
final String jsonString;
final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
final JoltTransform transform = getTransform(context, original);
if (customClassLoader != null) {
Thread.currentThread().setContextClassLoader(customClassLoader);
}
final Object transformedJson = TransformUtils.transform(transform,inputJson);
jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? JsonUtils.toPrettyJsonString(transformedJson) : JsonUtils.toJsonString(transformedJson);
} catch (final Exception ex) {
logger.error("Unable to transform {} due to {}", new Object[] {original, ex.toString(), ex});
session.transfer(original, REL_FAILURE);
return;
} finally {
if (customClassLoader != null && originalContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
}
}
FlowFile transformed = session.write(original, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(jsonString.getBytes(DEFAULT_CHARSET));
}
});
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
}
private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception {
final Optional<String> specString;
if (context.getProperty(JOLT_SPEC).isSet()) {
specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
} else {
specString = Optional.empty();
}
return transformCache.get(specString);
}
@OnScheduled
public void setup(final ProcessContext context) {
int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
transformCache = Caffeine.newBuilder()
.maximumSize(maxTransformsToCache)
.build(specString -> createTransform(context, specString.orElse(null)));
try {
if (context.getProperty(MODULES).isSet()) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(context.getProperty(MODULES).getValue(), this.getClass().getClassLoader(), getJarFilenameFilter());
} else {
customClassLoader = this.getClass().getClassLoader();
}
} catch (final Exception ex) {
getLogger().error("Unable to setup processor", ex);
}
}
private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception {
final Object specJson;
if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
} else {
specJson = null;
}
if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson);
} else {
return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
}
}
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> (name != null && name.endsWith(".jar"));
}
}