blob: b8a6adf716029200c602e38d28f10cefeeb54696 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@SupportsBatching
@Tags({"hash", "dupe", "duplicate", "dedupe"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. "
+ "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's "
+ "\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor "
+ "routes the FlowFile to 'non-duplicate'")
@WritesAttribute(attribute = "original.flowfile.description", description = "All FlowFiles routed to the duplicate relationship will have "
+ "an attribute added named original.flowfile.description. The value of this attribute is determined by the attributes of the original "
+ "copy of the data and by the FlowFile Description property.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
public class DetectDuplicate extends AbstractProcessor {
public static final String ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME = "original.flowfile.description";
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache unique identifiers, used to determine duplicates")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description(
"A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
+ "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.defaultValue("${hash.value}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor FLOWFILE_DESCRIPTION = new PropertyDescriptor.Builder()
.name("FlowFile Description")
.description("When a FlowFile is added to the cache, this value is stored along with it so that if a duplicate is found, this "
+ "description of the original FlowFile will be added to the duplicate's \""
+ ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME + "\" attribute")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("")
.build();
public static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder()
.name("Age Off Duration")
.description("Time interval to age off cached FlowFiles")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache The Entry Identifier")
.description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, "
+ "the processor would only check for duplicates and not cache the Entry Identifier, requiring another "
+ "processor to add identifiers to the distributed cache.")
.required(false)
.allowableValues("true","false")
.defaultValue("true")
.build();
public static final Relationship REL_DUPLICATE = new Relationship.Builder()
.name("duplicate")
.description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship")
.build();
public static final Relationship REL_NON_DUPLICATE = new Relationship.Builder()
.name("non-duplicate")
.description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If unable to communicate with the cache, the FlowFile will be penalized and routed to this relationship")
.build();
private final Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
private final Serializer<CacheValue> valueSerializer = new CacheValueSerializer();
private final Deserializer<CacheValue> valueDeserializer = new CacheValueDeserializer();
public DetectDuplicate() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_DUPLICATE);
rels.add(REL_NON_DUPLICATE);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CACHE_ENTRY_IDENTIFIER);
descriptors.add(FLOWFILE_DESCRIPTION);
descriptors.add(AGE_OFF_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(CACHE_IDENTIFIER);
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
final Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
final long now = System.currentTimeMillis();
try {
final String flowFileDescription = context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue();
final CacheValue cacheValue = new CacheValue(flowFileDescription, now);
final CacheValue originalCacheValue;
final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean();
if (shouldCacheIdentifier) {
originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer);
} else {
originalCacheValue = cache.get(cacheKey, keySerializer, valueDeserializer);
}
boolean duplicate = originalCacheValue != null;
if (duplicate && durationMS != null && (now >= originalCacheValue.getEntryTimeMS() + durationMS)) {
boolean status = cache.remove(cacheKey, keySerializer);
logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status});
// both should typically result in duplicate being false...but, better safe than sorry
if (shouldCacheIdentifier) {
duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer);
} else {
duplicate = cache.containsKey(cacheKey, keySerializer);
}
}
if (duplicate) {
session.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate of: " + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME);
String originalFlowFileDescription = originalCacheValue.getDescription();
flowFile = session.putAttribute(flowFile, ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME, originalFlowFileDescription);
session.transfer(flowFile, REL_DUPLICATE);
logger.info("Found {} to be a duplicate of FlowFile with description {}", new Object[]{flowFile, originalFlowFileDescription});
session.adjustCounter("Duplicates Detected", 1L, false);
} else {
session.getProvenanceReporter().route(flowFile, REL_NON_DUPLICATE);
session.transfer(flowFile, REL_NON_DUPLICATE);
logger.info("Could not find a duplicate entry in cache for {}; routing to non-duplicate", new Object[]{flowFile});
session.adjustCounter("Non-Duplicate Files Processed", 1L, false);
}
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
}
}
private static class CacheValue {
private final String description;
private final long entryTimeMS;
public CacheValue(String description, long entryTimeMS) {
this.description = description;
this.entryTimeMS = entryTimeMS;
}
public String getDescription() {
return description;
}
public long getEntryTimeMS() {
return entryTimeMS;
}
}
private static class CacheValueSerializer implements Serializer<CacheValue> {
@Override
public void serialize(final CacheValue entry, final OutputStream out) throws SerializationException, IOException {
long time = entry.getEntryTimeMS();
byte[] writeBuffer = new byte[8];
writeBuffer[0] = (byte) (time >>> 56);
writeBuffer[1] = (byte) (time >>> 48);
writeBuffer[2] = (byte) (time >>> 40);
writeBuffer[3] = (byte) (time >>> 32);
writeBuffer[4] = (byte) (time >>> 24);
writeBuffer[5] = (byte) (time >>> 16);
writeBuffer[6] = (byte) (time >>> 8);
writeBuffer[7] = (byte) (time);
out.write(writeBuffer, 0, 8);
out.write(entry.getDescription().getBytes(StandardCharsets.UTF_8));
}
}
private static class CacheValueDeserializer implements Deserializer<CacheValue> {
@Override
public CacheValue deserialize(final byte[] input) throws DeserializationException, IOException {
if (input.length == 0) {
return null;
}
long time = ((long) input[0] << 56)
+ ((long) (input[1] & 255) << 48)
+ ((long) (input[2] & 255) << 40)
+ ((long) (input[3] & 255) << 32)
+ ((long) (input[4] & 255) << 24)
+ ((input[5] & 255) << 16)
+ ((input[6] & 255) << 8)
+ ((input[7] & 255));
String description = new String(input, 8, input.length - 8, StandardCharsets.UTF_8);
CacheValue value = new CacheValue(description, time);
return value;
}
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}