/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.util;

import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.state.MockStateManager;
import org.junit.jupiter.api.Assertions;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class MockProcessSession implements ProcessSession {

    private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
    private final MockFlowFileQueue processorQueue;
    private final Set<Long> beingProcessed = new HashSet<>();
    private final List<MockFlowFile> penalized = new ArrayList<>();
    private final Processor processor;

    private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
    private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
    private final SharedSessionState sharedState;
    private final Map<String, Long> counterMap = new HashMap<>();
    private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();
    private final Set<FlowFile> writeRecursionSet = new HashSet<>();
    private final MockProvenanceReporter provenanceReporter;
    private final boolean enforceStreamsClosed;

    // A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed.
    private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
    // A List of OutputStreams that have been created by calls to {@link #write(FlowFile)} and have not yet been closed.
    private final Map<FlowFile, OutputStream> openOutputStreams = new HashMap<>();
    private final StateManager stateManager;
    private final boolean allowSynchronousCommits;

    private boolean committed = false;
    private boolean rolledback = false;
    private final Set<Long> removedFlowFiles = new HashSet<>();

    private static final AtomicLong enqueuedIndex = new AtomicLong(0L);

    public MockProcessSession(final SharedSessionState sharedState, final Processor processor) {
        this(sharedState, processor, true, new MockStateManager(processor));
    }

    public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final StateManager stateManager) {
        this(sharedState, processor, true, stateManager);
    }

    public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed, final StateManager stateManager) {
        this(sharedState, processor, enforceStreamsClosed, stateManager, false);
    }

    public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed, final StateManager stateManager,
                              final boolean allowSynchronousCommits) {
        this.processor = processor;
        this.enforceStreamsClosed = enforceStreamsClosed;
        this.sharedState = sharedState;
        this.processorQueue = sharedState.getFlowFileQueue();
        this.provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
        this.stateManager = stateManager;
        this.allowSynchronousCommits = allowSynchronousCommits;
    }

    @Override
    public void adjustCounter(final String name, final long delta, final boolean immediate) {
        if (immediate) {
            sharedState.adjustCounter(name, delta);
            return;
        }

        Long counter = counterMap.get(name);
        if (counter == null) {
            counter = delta;
            counterMap.put(name, counter);
            return;
        }

        counter = counter + delta;
        counterMap.put(name, counter);
    }

    @Override
    @SuppressWarnings({"rawtypes", "unchecked"})
    public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) {
        if (Objects.requireNonNull(newOwner) == this) {
            throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
        }
        if (flowFiles == null || flowFiles.isEmpty()) {
            throw new IllegalArgumentException("Must supply at least one FlowFile to migrate");
        }

        if (!(newOwner instanceof MockProcessSession)) {
            throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a session of type " + newOwner.getClass());
        }

        migrate((MockProcessSession) newOwner, (Collection<MockFlowFile>) (Collection) flowFiles);
    }

    private void migrate(final MockProcessSession newOwner, final Collection<MockFlowFile> flowFiles) {
        for (final FlowFile flowFile : flowFiles) {
            if (openInputStreams.containsKey(flowFile)) {
                throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
                    + "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
            }

            if (openOutputStreams.containsKey(flowFile)) {
                throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
                    + "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
            }

            if (readRecursionSet.containsKey(flowFile)) {
                throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
            }

            if (writeRecursionSet.contains(flowFile)) {
                throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
            }

            final FlowFile currentVersion = currentVersions.get(flowFile.getId());
            if (currentVersion == null) {
                throw new FlowFileHandlingException(flowFile + " is not known in this session");
            }
        }

        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
            final Relationship relationship = entry.getKey();
            final List<MockFlowFile> transferredFlowFiles = entry.getValue();

            for (final MockFlowFile flowFile : flowFiles) {
                if (transferredFlowFiles.remove(flowFile)) {
                    newOwner.transferMap.computeIfAbsent(relationship, rel -> new ArrayList<>()).add(flowFile);
                }
            }
        }

        for (final MockFlowFile flowFile : flowFiles) {
            if (beingProcessed.remove(flowFile.getId())) {
                newOwner.beingProcessed.add(flowFile.getId());
            }

            if (penalized.remove(flowFile)) {
                newOwner.penalized.add(flowFile);
            }

            if (currentVersions.containsKey(flowFile.getId())) {
                newOwner.currentVersions.put(flowFile.getId(), currentVersions.remove(flowFile.getId()));
            }

            if (originalVersions.containsKey(flowFile.getId())) {
                newOwner.originalVersions.put(flowFile.getId(), originalVersions.remove(flowFile.getId()));
            }

            if (removedFlowFiles.remove(flowFile.getId())) {
                newOwner.removedFlowFiles.add(flowFile.getId());
            }
        }

        final Set<String> flowFileIds = flowFiles.stream()
            .map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
            .collect(Collectors.toSet());

        provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
    }

    @Override
    public MockFlowFile clone(FlowFile flowFile) {
        flowFile = validateState(flowFile);
        final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        beingProcessed.add(newFlowFile.getId());
        return newFlowFile;
    }

    @Override
    public MockFlowFile clone(FlowFile flowFile, final long offset, final long size) {
        flowFile = validateState(flowFile);
        if (offset + size > flowFile.getSize()) {
            throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString());
        }

        final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
        final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size));
        newFlowFile.setData(newContent);

        currentVersions.put(newFlowFile.getId(), newFlowFile);
        beingProcessed.add(newFlowFile.getId());
        return newFlowFile;
    }

    private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final boolean enforceClosed) {
        final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
        for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) {
            final FlowFile flowFile = entry.getKey();
            final Closeable openStream = entry.getValue();

            try {
                openStream.close();
            } catch (IOException e) {
                throw new FlowFileAccessException("Failed to close stream for " + flowFile, e);
            }

            if (enforceClosed) {
                throw new FlowFileHandlingException("Cannot commit session because the following streams were created via "
                    + "calls to ProcessSession.read(FlowFile) or ProcessSession.write(FlowFile) and never closed: " + streamMap);
            }
        }
    }


    @Override
    public void commit() {
        if (!allowSynchronousCommits) {
            throw new RuntimeException("As of version 1.14.0, ProcessSession.commit() should be avoided when possible. See JavaDocs for explanations. Instead, use commitAsync(), " +
                "commitAsync(Runnable), or commitAsync(Runnable, Consumer<Throwable>). However, if this is not possible, ProcessSession.commit() may still be used, but this must be explicitly " +
                "enabled by calling TestRunner.");
        }

        commitInternal();
    }

    private void commitInternal() {
        if (!beingProcessed.isEmpty()) {
            throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed);
        }

        closeStreams(openInputStreams, enforceStreamsClosed);
        closeStreams(openOutputStreams, enforceStreamsClosed);

        committed = true;
        beingProcessed.clear();
        currentVersions.clear();
        originalVersions.clear();

        for (final Map.Entry<String, Long> entry : counterMap.entrySet()) {
            sharedState.adjustCounter(entry.getKey(), entry.getValue());
        }

        sharedState.addProvenanceEvents(provenanceReporter.getEvents());
        provenanceReporter.clear();
        counterMap.clear();
    }

    @Override
    public void commitAsync() {
        commitInternal();
    }

    @Override
    public void commitAsync(final Runnable onSuccess, final Consumer<Throwable> onFailure) {
        try {
            commitInternal();
        } catch (final Throwable t) {
            rollback();
            onFailure.accept(t);
            throw t;
        }

        onSuccess.run();
    }

    /**
     * Clear the 'committed' flag so that we can test that the next iteration of
     * {@link org.apache.nifi.processor.Processor#onTrigger} commits or rolls back the
     * session
     */
    public void clearCommitted() {
        committed = false;
    }

    /**
     * Clear the 'rolledBack' flag so that we can test that the next iteration
     * of {@link org.apache.nifi.processor.Processor#onTrigger} commits or rolls back the
     * session
     */
    public void clearRollback() {
        rolledback = false;
    }

    @Override
    public MockFlowFile create() {
        final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId());
        currentVersions.put(flowFile.getId(), flowFile);
        beingProcessed.add(flowFile.getId());
        return flowFile;
    }

    @Override
    public MockFlowFile create(final FlowFile flowFile) {
        MockFlowFile newFlowFile = create();
        newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        beingProcessed.add(newFlowFile.getId());
        return newFlowFile;
    }

    @Override
    public MockFlowFile create(final Collection<FlowFile> flowFiles) {
        MockFlowFile newFlowFile = create();
        newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        beingProcessed.add(newFlowFile.getId());
        return newFlowFile;
    }

    @Override
    public void exportTo(FlowFile flowFile, final OutputStream out) {
        flowFile = validateState(flowFile);
        if (flowFile == null || out == null) {
            throw new IllegalArgumentException("arguments cannot be null");
        }

        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }

        final MockFlowFile mock = (MockFlowFile) flowFile;

        try {
            out.write(mock.getData());
        } catch (final IOException e) {
            throw new FlowFileAccessException(e.toString(), e);
        }
    }

    @Override
    public void exportTo(FlowFile flowFile, final Path path, final boolean append) {
        flowFile = validateState(flowFile);
        if (flowFile == null || path == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }

        final MockFlowFile mock = (MockFlowFile) flowFile;

        final OpenOption mode = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE;

        try (final OutputStream out = Files.newOutputStream(path, mode)) {
            out.write(mock.getData());
        } catch (final IOException e) {
            throw new FlowFileAccessException(e.toString(), e);
        }
    }

    @Override
    public MockFlowFile get() {
        final MockFlowFile flowFile = processorQueue.poll();
        if (flowFile != null) {
            beingProcessed.add(flowFile.getId());
            currentVersions.put(flowFile.getId(), flowFile);
            originalVersions.put(flowFile.getId(), flowFile);
        }
        return flowFile;
    }

    @Override
    public List<FlowFile> get(final int maxResults) {
        final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, maxResults));
        for (int i = 0; i < maxResults; i++) {
            final MockFlowFile nextFlowFile = get();
            if (nextFlowFile == null) {
                return flowFiles;
            }

            flowFiles.add(nextFlowFile);
        }

        return flowFiles;
    }

    @Override
    public List<FlowFile> get(final FlowFileFilter filter) {
        final List<FlowFile> flowFiles = new ArrayList<>();
        final List<MockFlowFile> unselected = new ArrayList<>();

        while (true) {
            final MockFlowFile flowFile = processorQueue.poll();
            if (flowFile == null) {
                break;
            }

            final FlowFileFilter.FlowFileFilterResult filterResult = filter.filter(flowFile);
            if (filterResult.isAccept()) {
                flowFiles.add(flowFile);

                beingProcessed.add(flowFile.getId());
                currentVersions.put(flowFile.getId(), flowFile);
                originalVersions.put(flowFile.getId(), flowFile);
            } else {
                unselected.add(flowFile);
            }

            if (!filterResult.isContinue()) {
                break;
            }
        }

        processorQueue.addAll(unselected);
        return flowFiles;
    }

    @Override
    public QueueSize getQueueSize() {
        return processorQueue.size();
    }

    @Override
    public MockFlowFile importFrom(final InputStream in, FlowFile flowFile) {
        flowFile = validateState(flowFile);
        if (in == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;

        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        try {
            final byte[] data = readFully(in);
            newFlowFile.setData(data);
            return newFlowFile;
        } catch (final IOException e) {
            throw new FlowFileAccessException(e.toString(), e);
        }
    }

    @Override
    public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, FlowFile flowFile) {
        flowFile = validateState(flowFile);
        if (path == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;
        MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            Files.copy(path, baos);
        } catch (final IOException e) {
            throw new FlowFileAccessException(e.toString(), e);
        }

        newFlowFile.setData(baos.toByteArray());
        newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), path.getFileName().toString());
        return newFlowFile;
    }

    @Override
    public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
        sources = validateState(sources);
        destination = validateState(destination);
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        for (final FlowFile flowFile : sources) {
            final MockFlowFile mock = (MockFlowFile) flowFile;
            final byte[] data = mock.getData();
            try {
                baos.write(data);
            } catch (final IOException e) {
                throw new AssertionError("Failed to write to BAOS");
            }
        }

        final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
        newFlowFile.setData(baos.toByteArray());
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        return newFlowFile;
    }

    @Override
    public MockFlowFile putAllAttributes(FlowFile flowFile, final Map<String, String> attrs) {
        flowFile = validateState(flowFile);
        if (attrs == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;
        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        final Map<String, String> updatedAttributes;
        if (attrs.containsKey(CoreAttributes.UUID.key())) {
            updatedAttributes = new HashMap<>(attrs);
            updatedAttributes.remove(CoreAttributes.UUID.key());
        } else {
            updatedAttributes = attrs;
        }
        newFlowFile.putAttributes(updatedAttributes);
        return newFlowFile;
    }

    @Override
    public MockFlowFile putAttribute(FlowFile flowFile, final String attrName, final String attrValue) {
        flowFile = validateState(flowFile);
        if (attrName == null || attrValue == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create");
        }

        if ("uuid".equals(attrName)) {
            Assertions.fail("Should not be attempting to set FlowFile UUID via putAttribute. This will be ignored in production");
        }

        final MockFlowFile mock = (MockFlowFile) flowFile;
        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        final Map<String, String> attrs = new HashMap<>();
        attrs.put(attrName, attrValue);
        newFlowFile.putAttributes(attrs);
        return newFlowFile;
    }

    @Override
    public void read(final FlowFile flowFile, final InputStreamCallback callback) {
        read(flowFile, false, callback);
    }

    @Override
    public void read(FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
        if (callback == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }

        flowFile = validateState(flowFile);
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;

        final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
        incrementReadCount(flowFile);
        try {
            callback.process(bais);
            if(!allowSessionStreamManagement){
                bais.close();
            }
        } catch (final IOException e) {
            throw new ProcessException(e.toString(), e);
        } finally {
            decrementReadCount(flowFile);
        }
    }

    private void incrementReadCount(final FlowFile flowFile) {
        readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1);
    }

    private void decrementReadCount(final FlowFile flowFile) {
        final Integer count = readRecursionSet.get(flowFile);
        if (count == null) {
            return;
        }

        final int updatedCount = count - 1;
        if (updatedCount == 0) {
            readRecursionSet.remove(flowFile);
        } else {
            readRecursionSet.put(flowFile, updatedCount);
        }
    }

    @Override
    public InputStream read(FlowFile flowFile) {
        if (flowFile == null) {
            throw new IllegalArgumentException("FlowFile cannot be null");
        }

        final MockFlowFile mock = validateState(flowFile);

        final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
        incrementReadCount(flowFile);
        final InputStream errorHandlingStream = new InputStream() {
            @Override
            public int read() throws IOException {
                return bais.read();
            }

            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                return bais.read(b, off, len);
            }

            @Override
            public void close() throws IOException {
                decrementReadCount(flowFile);
                openInputStreams.remove(mock);
                bais.close();
            }

            @Override
            public void mark(final int readlimit) {
                bais.mark(readlimit);
            }

            @Override
            public void reset() {
                bais.reset();
            }

            @Override
            public int available() throws IOException {
                return bais.available();
            }

            @Override
            public String toString() {
                return "ErrorHandlingInputStream[flowFile=" + mock + "]";
            }
        };

        openInputStreams.put(mock, errorHandlingStream);
        return errorHandlingStream;
    }

    @Override
    public void remove(FlowFile flowFile) {
        flowFile = validateState(flowFile);

        final Iterator<MockFlowFile> penalizedItr = penalized.iterator();
        while (penalizedItr.hasNext()) {
            final MockFlowFile ff = penalizedItr.next();
            if (Objects.equals(ff.getId(), flowFile.getId())) {
                penalizedItr.remove();
                penalized.remove(ff);
                if (originalVersions.get(ff.getId()) != null) {
                    provenanceReporter.drop(ff, ff.getAttribute(CoreAttributes.DISCARD_REASON.key()));
                }
                break;
            }
        }

        final Iterator<Long> processedItr = beingProcessed.iterator();
        while (processedItr.hasNext()) {
            final Long ffId = processedItr.next();
            if (ffId != null && ffId.equals(flowFile.getId())) {
                processedItr.remove();
                beingProcessed.remove(ffId);
                removedFlowFiles.add(flowFile.getId());
                currentVersions.remove(ffId);
                if (originalVersions.get(flowFile.getId()) != null) {
                    provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
                }
                return;
            }
        }

        throw new ProcessException(flowFile + " not found in queue");
    }

    @Override
    public void remove(Collection<FlowFile> flowFiles) {
        flowFiles = validateState(flowFiles);

        for (final FlowFile flowFile : flowFiles) {
            remove(flowFile);
        }
    }

    @Override
    public MockFlowFile removeAllAttributes(FlowFile flowFile, final Set<String> attrNames) {
        flowFile = validateState(flowFile);
        if (attrNames == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;

        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        newFlowFile.removeAttributes(attrNames);
        return newFlowFile;
    }

    @Override
    public MockFlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) {
        flowFile = validateState(flowFile);
        if (flowFile == null) {
            throw new IllegalArgumentException("flowFile cannot be null");
        }
        if (keyPattern == null) {
            return (MockFlowFile) flowFile;
        }

        final Set<String> attrsToRemove = new HashSet<>();
        for (final String key : flowFile.getAttributes().keySet()) {
            if (keyPattern.matcher(key).matches()) {
                attrsToRemove.add(key);
            }
        }

        return removeAllAttributes(flowFile, attrsToRemove);
    }

    @Override
    public MockFlowFile removeAttribute(FlowFile flowFile, final String attrName) {
        flowFile = validateState(flowFile);
        if (attrName == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;
        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        final Set<String> attrNames = new HashSet<>();
        attrNames.add(attrName);
        newFlowFile.removeAttributes(attrNames);
        return newFlowFile;
    }

    @Override
    public void rollback() {
        rollback(false);
    }

    @Override
    public void rollback(final boolean penalize) {
        //if we've already committed then rollback is basically a no-op
        if(committed){
            return;
        }

        closeStreams(openInputStreams, false);
        closeStreams(openOutputStreams, false);

        for (final List<MockFlowFile> list : transferMap.values()) {
            for (final MockFlowFile flowFile : list) {
                processorQueue.offer(flowFile);
                if (penalize) {
                    penalized.add(flowFile);
                }
            }
        }

        for (final Long flowFileId : beingProcessed) {
            final MockFlowFile flowFile = originalVersions.get(flowFileId);
            if (flowFile != null) {
                processorQueue.offer(flowFile);
                if (penalize) {
                    penalized.add(flowFile);
                }
            }
        }

        rolledback = true;
        beingProcessed.clear();
        currentVersions.clear();
        originalVersions.clear();
        transferMap.clear();
        clearTransferState();
        if (!penalize) {
            penalized.clear();
        }
    }

    @Override
    public void transfer(FlowFile flowFile) {
        flowFile = validateState(flowFile);
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("I only accept MockFlowFile");
        }

        // if the flowfile provided was created in this session (i.e. it's in currentVersions and not in original versions),
        // then throw an exception indicating that you can't transfer flowfiles back to self.
        // this mimics the same behavior in StandardProcessSession
        if(currentVersions.get(flowFile.getId()) != null && originalVersions.get(flowFile.getId()) == null) {
            throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
        }

        final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
        beingProcessed.remove(flowFile.getId());
        processorQueue.offer(mockFlowFile);
        updateLastQueuedDate(mockFlowFile);

    }

    private void updateLastQueuedDate(MockFlowFile mockFlowFile) {
        // Simulate StandardProcessSession.updateLastQueuedDate,
        // which is called when a flow file is transferred to a relationship.
        mockFlowFile.setLastEnqueuedDate(System.currentTimeMillis());
        mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet());
    }

    @Override
    public void transfer(final Collection<FlowFile> flowFiles) {
        for (final FlowFile flowFile : flowFiles) {
            transfer(flowFile);
        }
    }

    @Override
    public void transfer(FlowFile flowFile, final Relationship relationship) {
        if (relationship == Relationship.SELF) {
            transfer(flowFile);
            return;
        }
        if(!processor.getRelationships().contains(relationship)){
            throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
        }

        flowFile = validateState(flowFile);
        List<MockFlowFile> list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>());

        beingProcessed.remove(flowFile.getId());
        list.add((MockFlowFile) flowFile);
        updateLastQueuedDate((MockFlowFile) flowFile);
    }

    @Override
    public void transfer(Collection<FlowFile> flowFiles, final Relationship relationship) {
        if (relationship == Relationship.SELF) {
            transfer(flowFiles);
            return;
        }
        for (final FlowFile flowFile : flowFiles) {
            transfer(flowFile, relationship);
        }
    }

    @Override
    public MockFlowFile write(FlowFile flowFile, final OutputStreamCallback callback) {
        flowFile = validateState(flowFile);
        if (callback == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }
        final MockFlowFile mock = (MockFlowFile) flowFile;

        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        writeRecursionSet.add(flowFile);
        try {
            callback.process(baos);
        } catch (final IOException e) {
            throw new ProcessException(e.toString(), e);
        } finally {
            writeRecursionSet.remove(flowFile);
        }

        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        newFlowFile.setData(baos.toByteArray());
        return newFlowFile;
    }

    @Override
    public OutputStream write(FlowFile flowFile) {
        if (!(flowFile instanceof MockFlowFile)) {
            throw new IllegalArgumentException("Cannot export a flow file that I did not create");
        }

        final MockFlowFile mockFlowFile = validateState(flowFile);
        writeRecursionSet.add(flowFile);
        final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
            @Override
            public void close() throws IOException {
                super.close();

                writeRecursionSet.remove(mockFlowFile);
                final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
                currentVersions.put(newFlowFile.getId(), newFlowFile);

                newFlowFile.setData(toByteArray());
            }
        };

        return baos;
    }

    @Override
    public FlowFile append(FlowFile flowFile, final OutputStreamCallback callback) {
        if (callback == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        final MockFlowFile mock = validateState(flowFile);

        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            baos.write(mock.getData());
            callback.process(baos);
        } catch (final IOException e) {
            throw new ProcessException(e.toString(), e);
        }

        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        newFlowFile.setData(baos.toByteArray());
        return newFlowFile;
    }

    @Override
    public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) {
        if (callback == null || flowFile == null) {
            throw new IllegalArgumentException("argument cannot be null");
        }
        final MockFlowFile mock = validateState(flowFile);

        final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
        final ByteArrayOutputStream out = new ByteArrayOutputStream();

        writeRecursionSet.add(flowFile);
        try {
            callback.process(in, out);
        } catch (final IOException e) {
            throw new ProcessException(e.toString(), e);
        } finally {
            writeRecursionSet.remove(flowFile);
        }

        final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        newFlowFile.setData(out.toByteArray());

        return newFlowFile;
    }

    private byte[] readFully(final InputStream in) throws IOException {
        final byte[] buffer = new byte[4096];
        int len;
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        while ((len = in.read(buffer)) >= 0) {
            baos.write(buffer, 0, len);
        }

        return baos.toByteArray();
    }

    public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) {
        List<MockFlowFile> list = this.transferMap.get(relationship);
        if (list == null) {
            list = new ArrayList<>();
        }

        return list;
    }

    public List<MockFlowFile> getPenalizedFlowFiles() {
        return penalized;
    }

    /**
     * @param relationship to get flowfiles for
     * @return a List of FlowFiles in the order in which they were transferred
     *         to the given relationship
     */
    public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
        final Relationship procRel = new Relationship.Builder().name(relationship).build();
        return getFlowFilesForRelationship(procRel);
    }

    public MockFlowFile createFlowFile(final File file) throws IOException {
        return createFlowFile(Files.readAllBytes(file.toPath()));
    }

    public MockFlowFile createFlowFile(final byte[] data) {
        final MockFlowFile flowFile = create();
        flowFile.setData(data);
        return flowFile;
    }

    public MockFlowFile createFlowFile(final byte[] data, final Map<String, String> attrs) {
        final MockFlowFile ff = createFlowFile(data);
        ff.putAttributes(attrs);
        return ff;
    }

    @Override
    public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
        sources = validateState(sources);
        destination = validateState(destination);

        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            if (header != null) {
                baos.write(header);
            }

            int count = 0;
            for (final FlowFile flowFile : sources) {
                baos.write(((MockFlowFile) flowFile).getData());
                if (demarcator != null && ++count != sources.size()) {
                    baos.write(demarcator);
                }
            }

            if (footer != null) {
                baos.write(footer);
            }
        } catch (final IOException e) {
            throw new AssertionError("failed to write data to BAOS");
        }

        final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination);
        newFlowFile.setData(baos.toByteArray());
        currentVersions.put(newFlowFile.getId(), newFlowFile);

        return newFlowFile;
    }

    private List<FlowFile> validateState(final Collection<FlowFile> flowFiles) {
        return flowFiles.stream()
            .map(ff -> validateState(ff))
            .collect(Collectors.toList());
    }

    private MockFlowFile validateState(final FlowFile flowFile) {
        Objects.requireNonNull(flowFile);

        final MockFlowFile currentVersion = currentVersions.get(flowFile.getId());
        if (currentVersion == null) {
            throw new FlowFileHandlingException(flowFile + " is not known in this session");
        }

        if (readRecursionSet.containsKey(flowFile)) {
            throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
        }

        if (writeRecursionSet.contains(flowFile)) {
            throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
        }

        for (final List<MockFlowFile> flowFiles : transferMap.values()) {
            if (flowFiles.contains(flowFile)) {
                throw new IllegalStateException(flowFile + " has already been transferred");
            }
        }

        return currentVersion;
    }


    /**
     * Inherits the attributes from the given source flow file into another flow
     * file. The UUID of the source becomes the parent UUID of this flow file.
     * If a parent uuid had previously been established it will be replaced by
     * the uuid of the given source
     *
     * @param source the FlowFile from which to copy attributes
     * @param destination the FlowFile to which to copy attributes
     */
    private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) {
        if (source == null || destination == null || source == destination) {
            return destination; // don't need to inherit from ourselves
        }
        final FlowFile updated = putAllAttributes(destination, source.getAttributes());
        getProvenanceReporter().fork(source, Collections.singletonList(updated));
        return updated;
    }

    /**
     * Inherits the attributes from the given source flow files into the
     * destination flow file. The UUIDs of the sources becomes the parent UUIDs
     * of the destination flow file. Only attributes which is common to all
     * source items is copied into this flow files attributes. Any previously
     * established parent UUIDs will be replaced by the UUIDs of the sources. It
     * will capture the uuid of a certain number of source objects and may not
     * capture all of them. How many it will capture is unspecified.
     *
     * @param sources to inherit common attributes from
     */
    private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) {
        final StringBuilder parentUuidBuilder = new StringBuilder();
        int uuidsCaptured = 0;
        for (final FlowFile source : sources) {
            if (source == destination) {
                continue; // don't want to capture parent uuid of this. Something can't be a child of itself
            }
            final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key());
            if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
                uuidsCaptured++;
                if (parentUuidBuilder.length() > 0) {
                    parentUuidBuilder.append(",");
                }
                parentUuidBuilder.append(sourceUuid);
            }

            if (uuidsCaptured > 100) {
                break;
            }
        }

        final FlowFile updated = putAllAttributes(destination, intersectAttributes(sources));
        getProvenanceReporter().join(sources, updated);
        return updated;
    }

    /**
     * Returns the attributes that are common to every flow file given. The key
     * and value must match exactly.
     *
     * @param flowFileList a list of flow files
     *
     * @return the common attributes
     */
    private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
        final Map<String, String> result = new HashMap<>();
        // trivial cases
        if (flowFileList == null || flowFileList.isEmpty()) {
            return result;
        } else if (flowFileList.size() == 1) {
            result.putAll(flowFileList.iterator().next().getAttributes());
        }

        /*
         * Start with the first attribute map and only put an entry to the
         * resultant map if it is common to every map.
         */
        final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();

        outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
            final String key = mapEntry.getKey();
            final String value = mapEntry.getValue();
            for (final FlowFile flowFile : flowFileList) {
                final Map<String, String> currMap = flowFile.getAttributes();
                final String curVal = currMap.get(key);
                if (curVal == null || !curVal.equals(value)) {
                    continue outer;
                }
            }
            result.put(key, value);
        }

        return result;
    }

    /**
     * Assert that the session has been committed
     */
    public void assertCommitted() {
        Assertions.assertTrue(committed, "Session was not committed");
    }

    /**
     * Assert that the session has not been committed
     */
    public void assertNotCommitted() {
        Assertions.assertFalse(committed, "Session was committed");
    }

    /**
     * Assert that {@link #rollback()} has been called
     */
    public void assertRolledBack() {
        Assertions.assertTrue(rolledback, "Session was not rolled back");
    }

    /**
     * Assert that {@link #rollback()} has not been called
     */
    public void assertNotRolledBack() {
        Assertions.assertFalse(rolledback, "Session was rolled back");
    }

    /**
     * Assert that the number of FlowFiles transferred to the given relationship
     * is equal to the given count
     *
     * @param relationship to validate transfer count of
     * @param count items transfer to given relationship
     */
    public void assertTransferCount(final Relationship relationship, final int count) {
        final int transferCount = getFlowFilesForRelationship(relationship).size();
        Assertions.assertEquals(count, transferCount, "Expected " + count + " FlowFiles to be transferred to "
            + relationship + " but actual transfer count was " + transferCount);
    }

    /**
     * Assert that the number of FlowFiles transferred to the given relationship
     * is equal to the given count
     *
     * @param relationship to validate transfer count of
     * @param count items transfer to given relationship
     */
    public void assertTransferCount(final String relationship, final int count) {
        assertTransferCount(new Relationship.Builder().name(relationship).build(), count);
    }

    /**
     * Assert that there are no FlowFiles left on the input queue.
     */
    public void assertQueueEmpty() {
        Assertions.assertTrue(this.processorQueue.isEmpty(), "FlowFile Queue has " + this.processorQueue.size() + " items");
    }

    /**
     * Assert that at least one FlowFile is on the input queue
     */
    public void assertQueueNotEmpty() {
        Assertions.assertFalse(this.processorQueue.isEmpty(), "FlowFile Queue is empty");
    }

    /**
     * Asserts that all FlowFiles that were transferred were transferred to the
     * given relationship
     *
     * @param relationship to check for transferred flow files
     */
    public void assertAllFlowFilesTransferred(final String relationship) {
        assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build());
    }

    /**
     * Asserts that all FlowFiles that were transferred were transferred to the
     * given relationship
     *
     * @param relationship to validate
     */
    public void assertAllFlowFilesTransferred(final Relationship relationship) {
        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
            final Relationship rel = entry.getKey();
            final List<MockFlowFile> flowFiles = entry.getValue();

            if (!rel.equals(relationship) && flowFiles != null && !flowFiles.isEmpty()) {
                Assertions.fail("Expected all Transferred FlowFiles to go to " + relationship + " but " + flowFiles.size() + " were routed to " + rel);
            }
        }
    }

    /**
     * Asserts that all FlowFiles that were transferred are compliant with the
     * given validator.
     *
     * @param validator validator to use
     */
    public void assertAllFlowFiles(FlowFileValidator validator) {
        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
            final List<MockFlowFile> flowFiles = entry.getValue();
            for (MockFlowFile mockFlowFile : flowFiles) {
                validator.assertFlowFile(mockFlowFile);
            }
        }
    }

    /**
     * Asserts that all FlowFiles that were transferred in the given relationship
     * are compliant with the given validator.
     *
     * @param validator validator to use
     */
    public void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator) {
        for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
            final List<MockFlowFile> flowFiles = entry.getValue();
            final Relationship rel = entry.getKey();
            for (MockFlowFile mockFlowFile : flowFiles) {
                if(rel.equals(relationship)) {
                    validator.assertFlowFile(mockFlowFile);
                }
            }
        }
    }

    /**
     * Removes all state information about FlowFiles that have been transferred
     */
    public void clearTransferState() {
        this.transferMap.clear();
    }

    /**
     * Asserts that all FlowFiles that were transferred were transferred to the
     * given relationship and that the number of FlowFiles transferred is equal
     * to <code>count</code>
     *
     * @param relationship to validate
     * @param count number of items sent to that relationship (expected)
     */
    public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) {
        assertAllFlowFilesTransferred(relationship);
        assertTransferCount(relationship, count);
    }

    /**
     * Asserts that all FlowFiles that were transferred were transferred to the
     * given relationship and that the number of FlowFiles transferred is equal
     * to <code>count</code>
     *
     * @param relationship to validate
     * @param count number of items sent to that relationship (expected)
     */
    public void assertAllFlowFilesTransferred(final String relationship, final int count) {
        assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count);
    }

    /**
     * @return the number of FlowFiles that were removed
     */
    public int getRemovedCount() {
        return removedFlowFiles.size();
    }

    @Override
    public ProvenanceReporter getProvenanceReporter() {
        return provenanceReporter;
    }

    @Override
    public void setState(final Map<String, String> state, final Scope scope) throws IOException {
        stateManager.setState(state, scope);
    }

    @Override
    public StateMap getState(final Scope scope) throws IOException {
        return stateManager.getState(scope);
    }

    @Override
    public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
        return stateManager.replace(oldValue, newValue, scope);
    }

    @Override
    public void clearState(final Scope scope) throws IOException {
        stateManager.clear(scope);
    }

    @Override
    public MockFlowFile penalize(FlowFile flowFile) {
        flowFile = validateState(flowFile);
        final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
        final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        newFlowFile.setPenalized(true);
        penalized.add(newFlowFile);
        return newFlowFile;
    }

    public MockFlowFile unpenalize(FlowFile flowFile) {
        flowFile = validateState(flowFile);
        final MockFlowFile newFlowFile = new MockFlowFile(flowFile.getId(), flowFile);
        currentVersions.put(newFlowFile.getId(), newFlowFile);
        newFlowFile.setPenalized(false);
        penalized.remove(newFlowFile);
        return newFlowFile;
    }

    public byte[] getContentAsByteArray(MockFlowFile flowFile) {
        flowFile = validateState(flowFile);
        return flowFile.getData();
    }

    /**
     * Checks if a FlowFile is known in this session.
     *
     * @param flowFile
     *            the FlowFile to check
     * @return <code>true</code> if the FlowFile is known in this session,
     *         <code>false</code> otherwise.
     */
    boolean isFlowFileKnown(final FlowFile flowFile) {
        final FlowFile curFlowFile = currentVersions.get(flowFile.getId());
        if (curFlowFile == null) {
            return false;
        }

        final String curUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key());
        final String providedUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key());
        return curUuid.equals(providedUuid);
    }
}
