/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;

import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;

@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
        + "is stored in the distributed cache from a corresponding Notify processor. "
        + "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. "
        + "The release signal entry is then removed from the cache. "
        + "The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex "
        + "property of the corresponding Notify processor is set properly. "
        + "If there are multiple release signals in the cache identified by the Release Signal Identifier, "
        + "and the Notify processor is configured to copy the FlowFile attributes to the cache, "
        + "then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles "
        + "that produced the release signals in the cache (identified by Release Signal Identifier). "
        + "Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "

        + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
        + "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
        + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
        + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
        + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."

        + "It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop."
)
@WritesAttributes({
        @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
        + "initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile.  "
        + "This attribute is not written when the FlowFile is transferred to failure, expired or success"),
        @WritesAttribute(attribute = "wait.counter.<counterName>", description = "The name of each counter for which at least one signal "
        + "has been present in the cache since the last time the cache was empty "
        + "gets copied to the current FlowFile as an attribute.")
})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
        "org.apache.nifi.processors.standard.Notify"})
public class Wait extends AbstractProcessor {

    public static final String WAIT_START_TIMESTAMP = "wait.start.timestamp";

    // Identifies the distributed map cache client
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
            .name("distributed-cache-service")
            .displayName("Distributed Cache Service")
            .description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
            .required(true)
            .identifiesControllerService(AtomicDistributedMapCacheClient.class)
            .build();

    // Selects the FlowFile attribute or expression, whose value is used as cache key
    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
            .name("release-signal-id")
            .displayName("Release Signal Identifier")
            .description("A value that specifies the key to a specific release signal cache. "
                + "To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' "
                + "or the 'wait' relationship, the processor checks the signals in the cache specified by this key.")
            .required(true)
            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .build();

    public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
            .name("target-signal-count")
            .displayName("Target Signal Count")
            .description("The number of signals that need to be in the cache (specified by the Release Signal Identifier) "
                + "in order for the FlowFile processed by the Wait processor to be sent to the ‘success’ relationship. "
                + "If the number of signals in the cache has reached this number, the FlowFile is routed to the "
                + "'success' relationship and the number of signals in the cache is decreased by this value. "
                + "If Signal Counter Name is specified, this processor checks a particular counter, "
                + "otherwise checks against the total number of signals in the cache.")
            .required(true)
            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .defaultValue("1")
            .build();

    public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
            .name("signal-counter-name")
            .displayName("Signal Counter Name")
            .description("Within the cache (specified by the Release Signal Identifier) the signals may belong to different counters. "
                + "If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. "
                + "If not specified, the processor checks the total number of signals in the cache.")
            .required(false)
            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .build();

    public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder()
            .name("wait-buffer-count")
            .displayName("Wait Buffer Count")
            .description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. " +
                    "The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
                    "by grouping FlowFiles by signal identifier. " +
                    "Only a signal identifier can be processed at a processor execution.")
            .required(true)
            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
            .defaultValue("1")
            .build();

    public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder()
            .name("releasable-flowfile-count")
            .displayName("Releasable FlowFile Count")
            .description("A value, or the results of an Attribute Expression Language statement, which will " +
                    "be evaluated against a FlowFile in order to determine the releasable FlowFile count. " +
                    "This specifies how many FlowFiles can be released when a target count reaches target signal count. " +
                    "Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.")
            .required(true)
            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .defaultValue("1")
            .build();

    // Selects the FlowFile attribute or expression, whose value is used as cache key
    public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
            .name("expiration-duration")
            .displayName("Expiration Duration")
            .description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship")
            .required(true)
            .defaultValue("10 min")
            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
            .build();

    public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new AllowableValue("replace", "Replace if present",
            "When cached attributes are copied onto released FlowFiles, they replace any matching attributes.");

    public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL = new AllowableValue("keeporiginal", "Keep original",
            "Attributes on released FlowFiles are not overwritten by copied cached attributes.");

    public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder()
            .name("attribute-copy-mode")
            .displayName("Attribute Copy Mode")
            .description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor")
            .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
            .required(true)
            .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
            .build();

    public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new AllowableValue("wait", "Transfer to wait relationship",
            "Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet." +
                    " This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship." +
                    " It is recommended to set a prioritizer (for instance First In First Out) on the 'wait' relationship.");

    public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new AllowableValue("keep", "Keep in the upstream connection",
            "Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet." +
                    " This mode helps keeping upstream connection being full so that the upstream source processor" +
                    " will not be scheduled while back-pressure is active and limit incoming FlowFiles. ");

    public static final PropertyDescriptor WAIT_MODE = new PropertyDescriptor.Builder()
            .name("wait-mode")
            .displayName("Wait Mode")
            .description("Specifies how to handle a FlowFile waiting for a notify signal")
            .defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue())
            .required(true)
            .allowableValues(WAIT_MODE_TRANSFER_TO_WAIT, WAIT_MODE_KEEP_IN_UPSTREAM)
            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
            .build();

    public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder()
        .name("wait-penalty-duration")
        .displayName("Wait Penalty Duration")
        .description("If configured, after a signal identifier got processed but did not meet the release criteria," +
            " the signal identifier is penalized and FlowFiles having the signal identifier" +
            " will not be processed again for the specified period of time," +
            " so that the signal identifier will not block others to be processed." +
            " This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers," +
            " and each signal identifier has multiple FlowFiles," +
            " and also the order of releasing FlowFiles is important within a signal identifier." +
            " The FlowFile order can be configured with Prioritizers." +
            " IMPORTANT: There is a limitation of number of queued signals can be processed," +
            " and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.")
        .required(false)
        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
        .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("A FlowFile with a matching release signal in the cache will be routed to this relationship")
            .build();

    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship")
            .build();

    public static final Relationship REL_WAIT = new Relationship.Builder()
            .name("wait")
            .description("A FlowFile with no matching release signal in the cache will be routed to this relationship")
            .build();

    public static final Relationship REL_EXPIRED = new Relationship.Builder()
            .name("expired")
            .description("A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship")
            .build();

    private final Set<Relationship> relationships;

    private final Map<String, Long> signalIdPenalties = new HashMap<>();

    public Wait() {
        final Set<Relationship> rels = new HashSet<>();
        rels.add(REL_SUCCESS);
        rels.add(REL_WAIT);
        rels.add(REL_EXPIRED);
        rels.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(rels);
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        final List<PropertyDescriptor> descriptors = new ArrayList<>();
        descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
        descriptors.add(TARGET_SIGNAL_COUNT);
        descriptors.add(SIGNAL_COUNTER_NAME);
        descriptors.add(WAIT_BUFFER_COUNT);
        descriptors.add(RELEASABLE_FLOWFILE_COUNT);
        descriptors.add(EXPIRATION_DURATION);
        descriptors.add(DISTRIBUTED_CACHE_SERVICE);
        descriptors.add(ATTRIBUTE_COPY_MODE);
        descriptors.add(WAIT_MODE);
        descriptors.add(WAIT_PENALTY_DURATION);
        return descriptors;
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        final ComponentLog logger = getLogger();

        // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
        final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
        final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();

        final Map<Relationship, List<FlowFile>> processedFlowFiles = new HashMap<>();
        final Function<Relationship, List<FlowFile>> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>());

        final AtomicReference<String> targetSignalId = new AtomicReference<>();
        final AtomicInteger bufferedCount = new AtomicInteger(0);
        final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
        final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
                () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;

        // Clear expired penalties.
        if (!signalIdPenalties.isEmpty()) {
            final Iterator<Entry<String, Long>> penaltyIterator = signalIdPenalties.entrySet().iterator();
            final long now = System.currentTimeMillis();
            while (penaltyIterator.hasNext()) {
                final Entry<String, Long> penalty = penaltyIterator.next();
                if (penalty.getValue() < now) {
                    penaltyIterator.remove();
                }
            }
        }

        final List<FlowFile> flowFiles = session.get(f -> {

            final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();

            // if the computed value is null, or empty, we transfer the FlowFile to failure relationship
            if (StringUtils.isBlank(fSignalId)) {
                // We can't penalize f before getting it from session, so keep it in a temporal list.
                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {f});
                failedFilteringFlowFiles.add(f);
                return ACCEPT_AND_CONTINUE;
            }

            if (signalIdPenalties.containsKey(fSignalId)) {
                // This id is penalized.
                return REJECT_AND_CONTINUE;
            }

            final String targetSignalIdStr = targetSignalId.get();
            if (targetSignalIdStr == null) {
                // This is the first one.
                targetSignalId.set(fSignalId);
                return acceptResultSupplier.get();
            }

            if (targetSignalIdStr.equals(fSignalId)) {
                return acceptResultSupplier.get();
            }

            return REJECT_AND_CONTINUE;

        });

        final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
        final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
        final AtomicReference<Signal> signalRef = new AtomicReference<>();
        // This map contains original counts before those are consumed to release incoming FlowFiles.
        final HashMap<String, Long> originalSignalCounts = new HashMap<>();

        final Consumer<FlowFile> transferToFailure = flowFile -> {
            flowFile = session.penalize(flowFile);
            // This flowFile is now failed, our tracking is done, clear the timer
            flowFile = clearWaitState(session, flowFile);
            getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
        };

        final Consumer<Entry<Relationship, List<FlowFile>>> transferFlowFiles = routedFlowFiles -> {
            Relationship relationship = routedFlowFiles.getKey();

            if (REL_WAIT.equals(relationship)) {
                final String waitMode = context.getProperty(WAIT_MODE).getValue();

                if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
                    // Transfer to self.
                    relationship = Relationship.SELF;
                }
            }
            final Relationship finalRelationship = relationship;
            final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
                    .map(f -> {
                        if (REL_SUCCESS.equals(finalRelationship) || REL_EXPIRED.equals(finalRelationship)) {
                            // These flowFiles will be exiting the wait, clear the timer
                            f = clearWaitState(session, f);
                        }
                        return copySignalAttributes(session, f, signalRef.get(),
                            originalSignalCounts,
                            replaceOriginalAttributes);
                    })
                    .collect(Collectors.toList());

            session.transfer(flowFilesWithSignalAttributes, relationship);
        };

        failedFilteringFlowFiles.forEach(f -> {
            flowFiles.remove(f);
            transferToFailure.accept(f);
        });

        if (flowFiles.isEmpty()) {
            // If there was nothing but failed FlowFiles while filtering, transfer those and end immediately.
            processedFlowFiles.entrySet().forEach(transferFlowFiles);
            return;
        }

        // the cache client used to interact with the distributed cache
        final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);

        final String signalId = targetSignalId.get();
        final Signal signal;

        // get notifying signal
        try {
            signal = protocol.getSignal(signalId);
            if (signal != null) {
                originalSignalCounts.putAll(signal.getCounts());
            }
            signalRef.set(signal);
        } catch (final IOException e) {
            throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
        }

        String targetCounterName = null;
        long targetCount = 1;
        int releasableFlowFileCount = 1;

        final List<FlowFile> candidates = new ArrayList<>();

        for (FlowFile flowFile : flowFiles) {
            // Set wait start timestamp if it's not set yet
            String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
            if (waitStartTimestamp == null) {
                waitStartTimestamp = String.valueOf(System.currentTimeMillis());
                flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp);
            }

            long lWaitStartTimestamp;
            try {
                lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
            } catch (NumberFormatException nfe) {
                logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
                transferToFailure.accept(flowFile);
                continue;
            }

            // check for expiration
            long expirationDuration = context.getProperty(EXPIRATION_DURATION)
                    .asTimePeriod(TimeUnit.MILLISECONDS);
            long now = System.currentTimeMillis();
            if (now > (lWaitStartTimestamp + expirationDuration)) {
                logger.info("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
                getFlowFilesFor.apply(REL_EXPIRED).add(flowFile);
                continue;
            }

            // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
            if (signal == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("No release signal found for {} on FlowFile {} yet", new Object[] {signalId, flowFile});
                }
                getFlowFilesFor.apply(REL_WAIT).add(flowFile);
                continue;
            }

            // Fix target counter name and count from current FlowFile, if those are not set yet.
            if (candidates.isEmpty()) {
                targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
                try {
                    targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
                } catch (final NumberFormatException e) {
                    transferToFailure.accept(flowFile);
                    logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e);
                    continue;
                }
                try {
                    releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
                } catch (final NumberFormatException e) {
                    transferToFailure.accept(flowFile);
                    logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e);
                    continue;
                }
            }

            // FlowFile is now validated and added to candidates.
            candidates.add(flowFile);
        }

        boolean waitCompleted = false;
        boolean waitProgressed = false;
        if (signal != null && !candidates.isEmpty()) {

            if (releasableFlowFileCount > 0) {
                signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
                        released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
                        waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
                waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
                waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();

            } else {
                boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
                        ? signal.isTotalCountReached(targetCount)
                        : signal.isCountReached(targetCounterName, targetCount);

                if (reachedTargetCount) {
                    getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
                } else {
                    getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
                }
            }
        }

        // Transfer FlowFiles.
        processedFlowFiles.entrySet().forEach(transferFlowFiles);

        // Penalize signal id if no FlowFile transferred to success.
        final PropertyValue waitPenaltyDuration = context.getProperty(WAIT_PENALTY_DURATION);
        if (waitPenaltyDuration.isSet() && getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
            signalIdPenalties.put(signalId, System.currentTimeMillis() + waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
        }

        // Update signal if needed.
        try {
            if (waitCompleted) {
                protocol.complete(signalId);
            } else if (waitProgressed) {
                protocol.replace(signal);
            }

        } catch (final IOException e) {
            session.rollback();
            throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
        }

    }

    private FlowFile clearWaitState(final ProcessSession session, final FlowFile flowFile) {
        return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
    }

    private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
        if (signal == null) {
            return flowFile;
        }

        // copy over attributes from release signal FlowFile, if provided
        final Map<String, String> attributesToCopy;
        if (replaceOriginal) {
            attributesToCopy = new HashMap<>(signal.getAttributes());
            attributesToCopy.remove("uuid");
        } else {
            // if the current FlowFile does *not* have the cached attribute, copy it
            attributesToCopy = signal.getAttributes().entrySet().stream()
                    .filter(e -> flowFile.getAttribute(e.getKey()) == null)
                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
        }

        // Copy counter attributes
        final long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
            final Long count = e.getValue();
            attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
            return count;
        }).sum();
        attributesToCopy.put("wait.counter.total", String.valueOf(totalCount));

        return session.putAllAttributes(flowFile, attributesToCopy);
    }

    @OnStopped
    public void onStopped(final ProcessContext context) {
        signalIdPenalties.clear();
    }

    Map<String, Long> getSignalIdPenalties() {
        return signalIdPenalties;
    }
}
