blob: eb1b52a01028255f94ebb29fb903544b83b834fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.HashSet;
import java.util.Map;
import java.util.Collections;
import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"json", "attributes", "flowfile"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Generates a JSON representation of the input FlowFile Attributes. The resulting JSON " +
"can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content.")
@WritesAttribute(attribute = "JSONAttributes", description = "JSON representation of Attributes")
public class AttributesToJSON extends AbstractProcessor {
public static final String JSON_ATTRIBUTE_NAME = "JSONAttributes";
private static final String AT_LIST_SEPARATOR = ",";
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
public static final String APPLICATION_JSON = "application/json";
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
.name("Attributes List")
.description("Comma separated list of attributes to be included in the resulting JSON. If this value " +
"is left empty then all existing Attributes will be included. This list of attributes is " +
"case sensitive. If an attribute specified in the list is not found it will be be emitted " +
"to the resulting JSON with an empty string or NULL value.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-json-regex")
.displayName("Attributes Regular Expression")
.description("Regular expression that will be evaluated against the flow file attributes to select "
+ "the matching attributes. This property can be used in combination with the attributes "
+ "list property.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination")
.description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
"or written in the flowfile content. Writing to flowfile content will overwrite any " +
"existing flowfile content.")
.required(true)
.allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
.defaultValue(DESTINATION_ATTRIBUTE)
.build();
public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Include Core Attributes")
.description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " +
"contained in every FlowFile should be included in the final JSON value generated.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
.name(("Null Value"))
.description("If true a non existing selected attribute will be NULL in the resulting JSON. If false an empty " +
"string will be placed in the JSON")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to JSON").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final ObjectMapper objectMapper = new ObjectMapper();
private volatile Set<String> attributesToRemove;
private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString;
private volatile boolean destinationContent;
private volatile Pattern pattern;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ATTRIBUTES_LIST);
properties.add(ATTRIBUTES_REGEX);
properties.add(DESTINATION);
properties.add(INCLUDE_CORE_ATTRIBUTES);
properties.add(NULL_VALUE_FOR_EMPTY_STRING);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
/**
* Builds the Map of attributes that should be included in the JSON that is emitted from this process.
*
* @return
* Map of values that are feed to a Jackson ObjectMapper
*/
protected Map<String, String> buildAttributesMapForFlowFile(FlowFile ff, Set<String> attributes, Set<String> attributesToRemove,
boolean nullValForEmptyString, Pattern attPattern) {
Map<String, String> result;
//If list of attributes specified get only those attributes. Otherwise write them all
if (attributes != null || attPattern != null) {
result = new HashMap<>();
if(attributes != null) {
for (String attribute : attributes) {
String val = ff.getAttribute(attribute);
if (val != null || nullValForEmptyString) {
result.put(attribute, val);
} else {
result.put(attribute, "");
}
}
}
if(attPattern != null) {
for (Map.Entry<String, String> e : ff.getAttributes().entrySet()) {
if(attPattern.matcher(e.getKey()).matches()) {
result.put(e.getKey(), e.getValue());
}
}
}
} else {
Map<String, String> ffAttributes = ff.getAttributes();
result = new HashMap<>(ffAttributes.size());
for (Map.Entry<String, String> e : ffAttributes.entrySet()) {
if (!attributesToRemove.contains(e.getKey())) {
result.put(e.getKey(), e.getValue());
}
}
}
return result;
}
private Set<String> buildAtrs(String atrList, Set<String> atrsToExclude) {
//If list of attributes specified get only those attributes. Otherwise write them all
if (StringUtils.isNotBlank(atrList)) {
String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
if (ats != null) {
Set<String> result = new HashSet<>(ats.length);
for (String str : ats) {
String trim = str.trim();
if (!atrsToExclude.contains(trim)) {
result.add(trim);
}
}
return result;
}
}
return null;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET : Arrays.stream(CoreAttributes.values())
.map(CoreAttributes::key)
.collect(Collectors.toSet());
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove);
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
if(context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
final Map<String, String> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern);
try {
if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(objectMapper.writeValueAsBytes(atrList));
}
});
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.transfer(conFlowfile, REL_SUCCESS);
} else {
FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME, objectMapper.writeValueAsString(atrList));
session.transfer(atFlowfile, REL_SUCCESS);
}
} catch (JsonProcessingException e) {
getLogger().error(e.getMessage());
session.transfer(original, REL_FAILURE);
}
}
}