blob: 52608b05e6eca4feba41446638d22512a360af80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
@Deprecated
@DeprecationNotice(classNames = {"org.apache.nifi.processors.standard.CryptographicHashContent"}, reason = "This processor is deprecated and may be removed in future releases.")
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hash", "content", "MD5", "SHA-1", "SHA-256"})
@CapabilityDescription("Calculates a hash value for the Content of a FlowFile and puts that hash value on the FlowFile as an attribute whose name "
+ "is determined by the <Hash Attribute Name> property. "
+ "This processor did not provide a consistent offering of hash algorithms, and is now deprecated. For modern cryptographic hashing capabilities, see \"CryptographicHashContent\". ")
@WritesAttribute(attribute = "<Hash Attribute Name>", description = "This Processor adds an attribute whose value is the result of Hashing the "
+ "existing FlowFile content. The name of this attribute is specified by the <Hash Attribute Name> property")
public class HashContent extends AbstractProcessor {
public static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Hash Attribute Name")
.description("The name of the FlowFile Attribute into which the Hash Value should be written. If the value already exists, it will be overwritten")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("hash.value")
.build();
public static final PropertyDescriptor HASH_ALGORITHM = new PropertyDescriptor.Builder()
.name("Hash Algorithm")
.description("Determines what hashing algorithm should be used to perform the hashing function")
.required(true)
.allowableValues(Security.getAlgorithms("MessageDigest"))
.defaultValue("MD5")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are process successfully will be sent to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be processed successfully will be sent to this relationship without any attribute being added")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(ProcessorInitializationContext context) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ATTRIBUTE_NAME);
props.add(HASH_ALGORITHM);
properties = Collections.unmodifiableList(props);
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final String algorithm = context.getProperty(HASH_ALGORITHM).getValue();
final MessageDigest digest;
try {
digest = MessageDigest.getInstance(algorithm);
} catch (NoSuchAlgorithmException e) {
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
final AtomicReference<String> hashValueHolder = new AtomicReference<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final DigestOutputStream digestOut = new DigestOutputStream(new NullOutputStream(), digest)) {
StreamUtils.copy(in, digestOut);
final byte[] hash = digest.digest();
final StringBuilder strb = new StringBuilder(hash.length * 2);
for (int i = 0; i < hash.length; i++) {
strb.append(Integer.toHexString((hash[i] & 0xFF) | 0x100), 1, 3);
}
hashValueHolder.set(strb.toString());
}
}
});
final String attributeName = context.getProperty(ATTRIBUTE_NAME).getValue();
flowFile = session.putAttribute(flowFile, attributeName, hashValueHolder.get());
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (final ProcessException e) {
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}
}