blob: 83ac2e82fdaa1485fc4c171a955294a67e4248b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
+ "is stored in the distributed cache from a corresponding Notify processor. "
+ "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. "
+ "The release signal entry is then removed from the cache. "
+ "The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex "
+ "property of the corresponding Notify processor is set properly. "
+ "If there are multiple release signals in the cache identified by the Release Signal Identifier, "
+ "and the Notify processor is configured to copy the FlowFile attributes to the cache, "
+ "then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles "
+ "that produced the release signals in the cache (identified by Release Signal Identifier). "
+ "Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
+ "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+ "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+ "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
+ "It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop."
)
@WritesAttributes({
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. "
+ "This attribute is not written when the FlowFile is transferred to failure, expired or success"),
@WritesAttribute(attribute = "wait.counter.<counterName>", description = "The name of each counter for which at least one signal "
+ "has been present in the cache since the last time the cache was empty "
+ "gets copied to the current FlowFile as an attribute.")
})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Notify"})
public class Wait extends AbstractProcessor {
public static final String WAIT_START_TIMESTAMP = "wait.start.timestamp";
// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("distributed-cache-service")
.displayName("Distributed Cache Service")
.description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
.required(true)
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("release-signal-id")
.displayName("Release Signal Identifier")
.description("A value that specifies the key to a specific release signal cache. "
+ "To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' "
+ "or the 'wait' relationship, the processor checks the signals in the cache specified by this key.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
.name("target-signal-count")
.displayName("Target Signal Count")
.description("The number of signals that need to be in the cache (specified by the Release Signal Identifier) "
+ "in order for the FlowFile processed by the Wait processor to be sent to the ‘success’ relationship. "
+ "If the number of signals in the cache has reached this number, the FlowFile is routed to the "
+ "'success' relationship and the number of signals in the cache is decreased by this value. "
+ "If Signal Counter Name is specified, this processor checks a particular counter, "
+ "otherwise checks against the total number of signals in the cache.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("1")
.build();
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("signal-counter-name")
.displayName("Signal Counter Name")
.description("Within the cache (specified by the Release Signal Identifier) the signals may belong to different counters. "
+ "If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. "
+ "If not specified, the processor checks the total number of signals in the cache.")
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder()
.name("wait-buffer-count")
.displayName("Wait Buffer Count")
.description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. " +
"The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
"by grouping FlowFiles by signal identifier. " +
"Only a signal identifier can be processed at a processor execution.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder()
.name("releasable-flowfile-count")
.displayName("Releasable FlowFile Count")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the releasable FlowFile count. " +
"This specifies how many FlowFiles can be released when a target count reaches target signal count. " +
"Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("1")
.build();
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("expiration-duration")
.displayName("Expiration Duration")
.description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship")
.required(true)
.defaultValue("10 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present",
"When cached attributes are copied onto released FlowFiles, they replace any matching attributes.");
public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
"Attributes on released FlowFiles are not overwritten by copied cached attributes.");
public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder()
.name("attribute-copy-mode")
.displayName("Attribute Copy Mode")
.description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor")
.defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
.required(true)
.allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new AllowableValue("wait", "Transfer to wait relationship",
"Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet." +
" This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship." +
" It is recommended to set a prioritizer (for instance First In First Out) on the 'wait' relationship.");
public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new AllowableValue("keep", "Keep in the upstream connection",
"Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet." +
" This mode helps keeping upstream connection being full so that the upstream source processor" +
" will not be scheduled while back-pressure is active and limit incoming FlowFiles. ");
public static final PropertyDescriptor WAIT_MODE = new PropertyDescriptor.Builder()
.name("wait-mode")
.displayName("Wait Mode")
.description("Specifies how to handle a FlowFile waiting for a notify signal")
.defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue())
.required(true)
.allowableValues(WAIT_MODE_TRANSFER_TO_WAIT, WAIT_MODE_KEEP_IN_UPSTREAM)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder()
.name("wait-penalty-duration")
.displayName("Wait Penalty Duration")
.description("If configured, after a signal identifier got processed but did not meet the release criteria," +
" the signal identifier is penalized and FlowFiles having the signal identifier" +
" will not be processed again for the specified period of time," +
" so that the signal identifier will not block others to be processed." +
" This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers," +
" and each signal identifier has multiple FlowFiles," +
" and also the order of releasing FlowFiles is important within a signal identifier." +
" The FlowFile order can be configured with Prioritizers." +
" IMPORTANT: There is a limitation of number of queued signals can be processed," +
" and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile with a matching release signal in the cache will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship")
.build();
public static final Relationship REL_WAIT = new Relationship.Builder()
.name("wait")
.description("A FlowFile with no matching release signal in the cache will be routed to this relationship")
.build();
public static final Relationship REL_EXPIRED = new Relationship.Builder()
.name("expired")
.description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship")
.build();
private final Set<Relationship> relationships;
private final Map<String, Long> signalIdPenalties = new HashMap<>();
public Wait() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_WAIT);
rels.add(REL_EXPIRED);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(TARGET_SIGNAL_COUNT);
descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(WAIT_BUFFER_COUNT);
descriptors.add(RELEASABLE_FLOWFILE_COUNT);
descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
descriptors.add(WAIT_MODE);
descriptors.add(WAIT_PENALTY_DURATION);
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();
final Map<Relationship, List<FlowFile>> processedFlowFiles = new HashMap<>();
final Function<Relationship, List<FlowFile>> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>());
final AtomicReference<String> targetSignalId = new AtomicReference<>();
final AtomicInteger bufferedCount = new AtomicInteger(0);
final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
() -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;
// Clear expired penalties.
if (!signalIdPenalties.isEmpty()) {
final Iterator<Entry<String, Long>> penaltyIterator = signalIdPenalties.entrySet().iterator();
final long now = System.currentTimeMillis();
while (penaltyIterator.hasNext()) {
final Entry<String, Long> penalty = penaltyIterator.next();
if (penalty.getValue() < now) {
penaltyIterator.remove();
}
}
}
final List<FlowFile> flowFiles = session.get(f -> {
final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();
// if the computed value is null, or empty, we transfer the FlowFile to failure relationship
if (StringUtils.isBlank(fSignalId)) {
// We can't penalize f before getting it from session, so keep it in a temporal list.
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {f});
failedFilteringFlowFiles.add(f);
return ACCEPT_AND_CONTINUE;
}
if (signalIdPenalties.containsKey(fSignalId)) {
// This id is penalized.
return REJECT_AND_CONTINUE;
}
final String targetSignalIdStr = targetSignalId.get();
if (targetSignalIdStr == null) {
// This is the first one.
targetSignalId.set(fSignalId);
return acceptResultSupplier.get();
}
if (targetSignalIdStr.equals(fSignalId)) {
return acceptResultSupplier.get();
}
return REJECT_AND_CONTINUE;
});
final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
final AtomicReference<Signal> signalRef = new AtomicReference<>();
// This map contains original counts before those are consumed to release incoming FlowFiles.
final HashMap<String, Long> originalSignalCounts = new HashMap<>();
final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
// This flowFile is now failed, our tracking is done, clear the timer
flowFile = clearWaitState(session, flowFile);
getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
};
final Consumer<Entry<Relationship, List<FlowFile>>> transferFlowFiles = routedFlowFiles -> {
Relationship relationship = routedFlowFiles.getKey();
if (REL_WAIT.equals(relationship)) {
final String waitMode = context.getProperty(WAIT_MODE).getValue();
if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
// Transfer to self.
relationship = Relationship.SELF;
}
}
final Relationship finalRelationship = relationship;
final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
.map(f -> {
if (REL_SUCCESS.equals(finalRelationship) || REL_EXPIRED.equals(finalRelationship)) {
// These flowFiles will be exiting the wait, clear the timer
f = clearWaitState(session, f);
}
return copySignalAttributes(session, f, signalRef.get(),
originalSignalCounts,
replaceOriginalAttributes);
})
.collect(Collectors.toList());
session.transfer(flowFilesWithSignalAttributes, relationship);
};
failedFilteringFlowFiles.forEach(f -> {
flowFiles.remove(f);
transferToFailure.accept(f);
});
if (flowFiles.isEmpty()) {
// If there was nothing but failed FlowFiles while filtering, transfer those and end immediately.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
return;
}
// the cache client used to interact with the distributed cache
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final String signalId = targetSignalId.get();
final Signal signal;
// get notifying signal
try {
signal = protocol.getSignal(signalId);
if (signal != null) {
originalSignalCounts.putAll(signal.getCounts());
}
signalRef.set(signal);
} catch (final IOException e) {
throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
}
String targetCounterName = null;
long targetCount = 1;
int releasableFlowFileCount = 1;
final List<FlowFile> candidates = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
// Set wait start timestamp if it's not set yet
String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
if (waitStartTimestamp == null) {
waitStartTimestamp = String.valueOf(System.currentTimeMillis());
flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp);
}
long lWaitStartTimestamp;
try {
lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
} catch (NumberFormatException nfe) {
logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
transferToFailure.accept(flowFile);
continue;
}
// check for expiration
long expirationDuration = context.getProperty(EXPIRATION_DURATION)
.asTimePeriod(TimeUnit.MILLISECONDS);
long now = System.currentTimeMillis();
if (now > (lWaitStartTimestamp + expirationDuration)) {
logger.info("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
getFlowFilesFor.apply(REL_EXPIRED).add(flowFile);
continue;
}
// If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (signal == null) {
if (logger.isDebugEnabled()) {
logger.debug("No release signal found for {} on FlowFile {} yet", new Object[] {signalId, flowFile});
}
getFlowFilesFor.apply(REL_WAIT).add(flowFile);
continue;
}
// Fix target counter name and count from current FlowFile, if those are not set yet.
if (candidates.isEmpty()) {
targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
try {
targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
} catch (final NumberFormatException e) {
transferToFailure.accept(flowFile);
logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e);
continue;
}
try {
releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
} catch (final NumberFormatException e) {
transferToFailure.accept(flowFile);
logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e);
continue;
}
}
// FlowFile is now validated and added to candidates.
candidates.add(flowFile);
}
boolean waitCompleted = false;
boolean waitProgressed = false;
if (signal != null && !candidates.isEmpty()) {
if (releasableFlowFileCount > 0) {
signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
} else {
boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount);
if (reachedTargetCount) {
getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
} else {
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
}
}
}
// Transfer FlowFiles.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
// Penalize signal id if no FlowFile transferred to success.
final PropertyValue waitPenaltyDuration = context.getProperty(WAIT_PENALTY_DURATION);
if (waitPenaltyDuration.isSet() && getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
signalIdPenalties.put(signalId, System.currentTimeMillis() + waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
}
// Update signal if needed.
try {
if (waitCompleted) {
protocol.complete(signalId);
} else if (waitProgressed) {
protocol.replace(signal);
}
} catch (final IOException e) {
session.rollback();
throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
}
}
private FlowFile clearWaitState(final ProcessSession session, final FlowFile flowFile) {
return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
}
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
if (signal == null) {
return flowFile;
}
// copy over attributes from release signal FlowFile, if provided
final Map<String, String> attributesToCopy;
if (replaceOriginal) {
attributesToCopy = new HashMap<>(signal.getAttributes());
attributesToCopy.remove("uuid");
} else {
// if the current FlowFile does *not* have the cached attribute, copy it
attributesToCopy = signal.getAttributes().entrySet().stream()
.filter(e -> flowFile.getAttribute(e.getKey()) == null)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
// Copy counter attributes
final long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
final Long count = e.getValue();
attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
return count;
}).sum();
attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));
return session.putAllAttributes(flowFile, attributesToCopy);
}
@OnStopped
public void onStopped(final ProcessContext context) {
signalIdPenalties.clear();
}
Map<String, Long> getSignalIdPenalties() {
return signalIdPenalties;
}
}