| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.util; |
| |
| import org.apache.nifi.components.state.Scope; |
| import org.apache.nifi.components.state.StateManager; |
| import org.apache.nifi.components.state.StateMap; |
| import org.apache.nifi.controller.queue.QueueSize; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.processor.FlowFileFilter; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Processor; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.FlowFileAccessException; |
| import org.apache.nifi.processor.exception.FlowFileHandlingException; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.io.InputStreamCallback; |
| import org.apache.nifi.processor.io.OutputStreamCallback; |
| import org.apache.nifi.processor.io.StreamCallback; |
| import org.apache.nifi.provenance.ProvenanceReporter; |
| import org.apache.nifi.state.MockStateManager; |
| import org.junit.jupiter.api.Assertions; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.file.Files; |
| import java.nio.file.OpenOption; |
| import java.nio.file.Path; |
| import java.nio.file.StandardOpenOption; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| |
| public class MockProcessSession implements ProcessSession { |
| |
| private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>(); |
| private final MockFlowFileQueue processorQueue; |
| private final Set<Long> beingProcessed = new HashSet<>(); |
| private final List<MockFlowFile> penalized = new ArrayList<>(); |
| private final Processor processor; |
| |
| private final Map<Long, MockFlowFile> currentVersions = new HashMap<>(); |
| private final Map<Long, MockFlowFile> originalVersions = new HashMap<>(); |
| private final SharedSessionState sharedState; |
| private final Map<String, Long> counterMap = new HashMap<>(); |
| private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>(); |
| private final Set<FlowFile> writeRecursionSet = new HashSet<>(); |
| private final MockProvenanceReporter provenanceReporter; |
| private final boolean enforceStreamsClosed; |
| |
| // A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed. |
| private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>(); |
| // A List of OutputStreams that have been created by calls to {@link #write(FlowFile)} and have not yet been closed. |
| private final Map<FlowFile, OutputStream> openOutputStreams = new HashMap<>(); |
| private final StateManager stateManager; |
| private final boolean allowSynchronousCommits; |
| |
| private boolean committed = false; |
| private boolean rolledback = false; |
| private final Set<Long> removedFlowFiles = new HashSet<>(); |
| |
| private static final AtomicLong enqueuedIndex = new AtomicLong(0L); |
| |
| public MockProcessSession(final SharedSessionState sharedState, final Processor processor) { |
| this(sharedState, processor, true, new MockStateManager(processor)); |
| } |
| |
| public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final StateManager stateManager) { |
| this(sharedState, processor, true, stateManager); |
| } |
| |
| public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed, final StateManager stateManager) { |
| this(sharedState, processor, enforceStreamsClosed, stateManager, false); |
| } |
| |
| public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed, final StateManager stateManager, |
| final boolean allowSynchronousCommits) { |
| this.processor = processor; |
| this.enforceStreamsClosed = enforceStreamsClosed; |
| this.sharedState = sharedState; |
| this.processorQueue = sharedState.getFlowFileQueue(); |
| this.provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName()); |
| this.stateManager = stateManager; |
| this.allowSynchronousCommits = allowSynchronousCommits; |
| } |
| |
| @Override |
| public void adjustCounter(final String name, final long delta, final boolean immediate) { |
| if (immediate) { |
| sharedState.adjustCounter(name, delta); |
| return; |
| } |
| |
| Long counter = counterMap.get(name); |
| if (counter == null) { |
| counter = delta; |
| counterMap.put(name, counter); |
| return; |
| } |
| |
| counter = counter + delta; |
| counterMap.put(name, counter); |
| } |
| |
| @Override |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| public void migrate(final ProcessSession newOwner, final Collection<FlowFile> flowFiles) { |
| if (Objects.requireNonNull(newOwner) == this) { |
| throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself"); |
| } |
| if (flowFiles == null || flowFiles.isEmpty()) { |
| throw new IllegalArgumentException("Must supply at least one FlowFile to migrate"); |
| } |
| |
| if (!(newOwner instanceof MockProcessSession)) { |
| throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a session of type " + newOwner.getClass()); |
| } |
| |
| migrate((MockProcessSession) newOwner, (Collection<MockFlowFile>) (Collection) flowFiles); |
| } |
| |
| private void migrate(final MockProcessSession newOwner, final Collection<MockFlowFile> flowFiles) { |
| for (final FlowFile flowFile : flowFiles) { |
| if (openInputStreams.containsKey(flowFile)) { |
| throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " |
| + "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)"); |
| } |
| |
| if (openOutputStreams.containsKey(flowFile)) { |
| throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " |
| + "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)"); |
| } |
| |
| if (readRecursionSet.containsKey(flowFile)) { |
| throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); |
| } |
| |
| if (writeRecursionSet.contains(flowFile)) { |
| throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed"); |
| } |
| |
| final FlowFile currentVersion = currentVersions.get(flowFile.getId()); |
| if (currentVersion == null) { |
| throw new FlowFileHandlingException(flowFile + " is not known in this session"); |
| } |
| } |
| |
| for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { |
| final Relationship relationship = entry.getKey(); |
| final List<MockFlowFile> transferredFlowFiles = entry.getValue(); |
| |
| for (final MockFlowFile flowFile : flowFiles) { |
| if (transferredFlowFiles.remove(flowFile)) { |
| newOwner.transferMap.computeIfAbsent(relationship, rel -> new ArrayList<>()).add(flowFile); |
| } |
| } |
| } |
| |
| for (final MockFlowFile flowFile : flowFiles) { |
| if (beingProcessed.remove(flowFile.getId())) { |
| newOwner.beingProcessed.add(flowFile.getId()); |
| } |
| |
| if (penalized.remove(flowFile)) { |
| newOwner.penalized.add(flowFile); |
| } |
| |
| if (currentVersions.containsKey(flowFile.getId())) { |
| newOwner.currentVersions.put(flowFile.getId(), currentVersions.remove(flowFile.getId())); |
| } |
| |
| if (originalVersions.containsKey(flowFile.getId())) { |
| newOwner.originalVersions.put(flowFile.getId(), originalVersions.remove(flowFile.getId())); |
| } |
| |
| if (removedFlowFiles.remove(flowFile.getId())) { |
| newOwner.removedFlowFiles.add(flowFile.getId()); |
| } |
| } |
| |
| final Set<String> flowFileIds = flowFiles.stream() |
| .map(ff -> ff.getAttribute(CoreAttributes.UUID.key())) |
| .collect(Collectors.toSet()); |
| |
| provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds); |
| } |
| |
| @Override |
| public MockFlowFile clone(FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| beingProcessed.add(newFlowFile.getId()); |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile clone(FlowFile flowFile, final long offset, final long size) { |
| flowFile = validateState(flowFile); |
| if (offset + size > flowFile.getSize()) { |
| throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString()); |
| } |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); |
| final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size)); |
| newFlowFile.setData(newContent); |
| |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| beingProcessed.add(newFlowFile.getId()); |
| return newFlowFile; |
| } |
| |
| private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final boolean enforceClosed) { |
| final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List |
| for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) { |
| final FlowFile flowFile = entry.getKey(); |
| final Closeable openStream = entry.getValue(); |
| |
| try { |
| openStream.close(); |
| } catch (IOException e) { |
| throw new FlowFileAccessException("Failed to close stream for " + flowFile, e); |
| } |
| |
| if (enforceClosed) { |
| throw new FlowFileHandlingException("Cannot commit session because the following streams were created via " |
| + "calls to ProcessSession.read(FlowFile) or ProcessSession.write(FlowFile) and never closed: " + streamMap); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void commit() { |
| if (!allowSynchronousCommits) { |
| throw new RuntimeException("As of version 1.14.0, ProcessSession.commit() should be avoided when possible. See JavaDocs for explanations. Instead, use commitAsync(), " + |
| "commitAsync(Runnable), or commitAsync(Runnable, Consumer<Throwable>). However, if this is not possible, ProcessSession.commit() may still be used, but this must be explicitly " + |
| "enabled by calling TestRunner."); |
| } |
| |
| commitInternal(); |
| } |
| |
| private void commitInternal() { |
| if (!beingProcessed.isEmpty()) { |
| throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed); |
| } |
| |
| closeStreams(openInputStreams, enforceStreamsClosed); |
| closeStreams(openOutputStreams, enforceStreamsClosed); |
| |
| committed = true; |
| beingProcessed.clear(); |
| currentVersions.clear(); |
| originalVersions.clear(); |
| |
| for (final Map.Entry<String, Long> entry : counterMap.entrySet()) { |
| sharedState.adjustCounter(entry.getKey(), entry.getValue()); |
| } |
| |
| sharedState.addProvenanceEvents(provenanceReporter.getEvents()); |
| provenanceReporter.clear(); |
| counterMap.clear(); |
| } |
| |
| @Override |
| public void commitAsync() { |
| commitInternal(); |
| } |
| |
| @Override |
| public void commitAsync(final Runnable onSuccess, final Consumer<Throwable> onFailure) { |
| try { |
| commitInternal(); |
| } catch (final Throwable t) { |
| rollback(); |
| onFailure.accept(t); |
| throw t; |
| } |
| |
| onSuccess.run(); |
| } |
| |
| /** |
| * Clear the 'committed' flag so that we can test that the next iteration of |
| * {@link org.apache.nifi.processor.Processor#onTrigger} commits or rolls back the |
| * session |
| */ |
| public void clearCommitted() { |
| committed = false; |
| } |
| |
| /** |
| * Clear the 'rolledBack' flag so that we can test that the next iteration |
| * of {@link org.apache.nifi.processor.Processor#onTrigger} commits or rolls back the |
| * session |
| */ |
| public void clearRollback() { |
| rolledback = false; |
| } |
| |
| @Override |
| public MockFlowFile create() { |
| final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId()); |
| currentVersions.put(flowFile.getId(), flowFile); |
| beingProcessed.add(flowFile.getId()); |
| return flowFile; |
| } |
| |
| @Override |
| public MockFlowFile create(final FlowFile flowFile) { |
| MockFlowFile newFlowFile = create(); |
| newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| beingProcessed.add(newFlowFile.getId()); |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile create(final Collection<FlowFile> flowFiles) { |
| MockFlowFile newFlowFile = create(); |
| newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| beingProcessed.add(newFlowFile.getId()); |
| return newFlowFile; |
| } |
| |
| @Override |
| public void exportTo(FlowFile flowFile, final OutputStream out) { |
| flowFile = validateState(flowFile); |
| if (flowFile == null || out == null) { |
| throw new IllegalArgumentException("arguments cannot be null"); |
| } |
| |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| |
| try { |
| out.write(mock.getData()); |
| } catch (final IOException e) { |
| throw new FlowFileAccessException(e.toString(), e); |
| } |
| } |
| |
| @Override |
| public void exportTo(FlowFile flowFile, final Path path, final boolean append) { |
| flowFile = validateState(flowFile); |
| if (flowFile == null || path == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| |
| final OpenOption mode = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE; |
| |
| try (final OutputStream out = Files.newOutputStream(path, mode)) { |
| out.write(mock.getData()); |
| } catch (final IOException e) { |
| throw new FlowFileAccessException(e.toString(), e); |
| } |
| } |
| |
| @Override |
| public MockFlowFile get() { |
| final MockFlowFile flowFile = processorQueue.poll(); |
| if (flowFile != null) { |
| beingProcessed.add(flowFile.getId()); |
| currentVersions.put(flowFile.getId(), flowFile); |
| originalVersions.put(flowFile.getId(), flowFile); |
| } |
| return flowFile; |
| } |
| |
| @Override |
| public List<FlowFile> get(final int maxResults) { |
| final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, maxResults)); |
| for (int i = 0; i < maxResults; i++) { |
| final MockFlowFile nextFlowFile = get(); |
| if (nextFlowFile == null) { |
| return flowFiles; |
| } |
| |
| flowFiles.add(nextFlowFile); |
| } |
| |
| return flowFiles; |
| } |
| |
| @Override |
| public List<FlowFile> get(final FlowFileFilter filter) { |
| final List<FlowFile> flowFiles = new ArrayList<>(); |
| final List<MockFlowFile> unselected = new ArrayList<>(); |
| |
| while (true) { |
| final MockFlowFile flowFile = processorQueue.poll(); |
| if (flowFile == null) { |
| break; |
| } |
| |
| final FlowFileFilter.FlowFileFilterResult filterResult = filter.filter(flowFile); |
| if (filterResult.isAccept()) { |
| flowFiles.add(flowFile); |
| |
| beingProcessed.add(flowFile.getId()); |
| currentVersions.put(flowFile.getId(), flowFile); |
| originalVersions.put(flowFile.getId(), flowFile); |
| } else { |
| unselected.add(flowFile); |
| } |
| |
| if (!filterResult.isContinue()) { |
| break; |
| } |
| } |
| |
| processorQueue.addAll(unselected); |
| return flowFiles; |
| } |
| |
| @Override |
| public QueueSize getQueueSize() { |
| return processorQueue.size(); |
| } |
| |
| @Override |
| public MockFlowFile importFrom(final InputStream in, FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| if (in == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| try { |
| final byte[] data = readFully(in); |
| newFlowFile.setData(data); |
| return newFlowFile; |
| } catch (final IOException e) { |
| throw new FlowFileAccessException(e.toString(), e); |
| } |
| } |
| |
| @Override |
| public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| if (path == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try { |
| Files.copy(path, baos); |
| } catch (final IOException e) { |
| throw new FlowFileAccessException(e.toString(), e); |
| } |
| |
| newFlowFile.setData(baos.toByteArray()); |
| newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), path.getFileName().toString()); |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination) { |
| sources = validateState(sources); |
| destination = validateState(destination); |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| for (final FlowFile flowFile : sources) { |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| final byte[] data = mock.getData(); |
| try { |
| baos.write(data); |
| } catch (final IOException e) { |
| throw new AssertionError("Failed to write to BAOS"); |
| } |
| } |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); |
| newFlowFile.setData(baos.toByteArray()); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile putAllAttributes(FlowFile flowFile, final Map<String, String> attrs) { |
| flowFile = validateState(flowFile); |
| if (attrs == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| final Map<String, String> updatedAttributes; |
| if (attrs.containsKey(CoreAttributes.UUID.key())) { |
| updatedAttributes = new HashMap<>(attrs); |
| updatedAttributes.remove(CoreAttributes.UUID.key()); |
| } else { |
| updatedAttributes = attrs; |
| } |
| newFlowFile.putAttributes(updatedAttributes); |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile putAttribute(FlowFile flowFile, final String attrName, final String attrValue) { |
| flowFile = validateState(flowFile); |
| if (attrName == null || attrValue == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create"); |
| } |
| |
| if ("uuid".equals(attrName)) { |
| Assertions.fail("Should not be attempting to set FlowFile UUID via putAttribute. This will be ignored in production"); |
| } |
| |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| final Map<String, String> attrs = new HashMap<>(); |
| attrs.put(attrName, attrValue); |
| newFlowFile.putAttributes(attrs); |
| return newFlowFile; |
| } |
| |
| @Override |
| public void read(final FlowFile flowFile, final InputStreamCallback callback) { |
| read(flowFile, false, callback); |
| } |
| |
| @Override |
| public void read(FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) { |
| if (callback == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| |
| flowFile = validateState(flowFile); |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| |
| final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); |
| incrementReadCount(flowFile); |
| try { |
| callback.process(bais); |
| if(!allowSessionStreamManagement){ |
| bais.close(); |
| } |
| } catch (final IOException e) { |
| throw new ProcessException(e.toString(), e); |
| } finally { |
| decrementReadCount(flowFile); |
| } |
| } |
| |
| private void incrementReadCount(final FlowFile flowFile) { |
| readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1); |
| } |
| |
| private void decrementReadCount(final FlowFile flowFile) { |
| final Integer count = readRecursionSet.get(flowFile); |
| if (count == null) { |
| return; |
| } |
| |
| final int updatedCount = count - 1; |
| if (updatedCount == 0) { |
| readRecursionSet.remove(flowFile); |
| } else { |
| readRecursionSet.put(flowFile, updatedCount); |
| } |
| } |
| |
| @Override |
| public InputStream read(FlowFile flowFile) { |
| if (flowFile == null) { |
| throw new IllegalArgumentException("FlowFile cannot be null"); |
| } |
| |
| final MockFlowFile mock = validateState(flowFile); |
| |
| final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); |
| incrementReadCount(flowFile); |
| final InputStream errorHandlingStream = new InputStream() { |
| @Override |
| public int read() throws IOException { |
| return bais.read(); |
| } |
| |
| @Override |
| public int read(byte[] b, int off, int len) throws IOException { |
| return bais.read(b, off, len); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| decrementReadCount(flowFile); |
| openInputStreams.remove(mock); |
| bais.close(); |
| } |
| |
| @Override |
| public void mark(final int readlimit) { |
| bais.mark(readlimit); |
| } |
| |
| @Override |
| public void reset() { |
| bais.reset(); |
| } |
| |
| @Override |
| public int available() throws IOException { |
| return bais.available(); |
| } |
| |
| @Override |
| public String toString() { |
| return "ErrorHandlingInputStream[flowFile=" + mock + "]"; |
| } |
| }; |
| |
| openInputStreams.put(mock, errorHandlingStream); |
| return errorHandlingStream; |
| } |
| |
| @Override |
| public void remove(FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| |
| final Iterator<MockFlowFile> penalizedItr = penalized.iterator(); |
| while (penalizedItr.hasNext()) { |
| final MockFlowFile ff = penalizedItr.next(); |
| if (Objects.equals(ff.getId(), flowFile.getId())) { |
| penalizedItr.remove(); |
| penalized.remove(ff); |
| if (originalVersions.get(ff.getId()) != null) { |
| provenanceReporter.drop(ff, ff.getAttribute(CoreAttributes.DISCARD_REASON.key())); |
| } |
| break; |
| } |
| } |
| |
| final Iterator<Long> processedItr = beingProcessed.iterator(); |
| while (processedItr.hasNext()) { |
| final Long ffId = processedItr.next(); |
| if (ffId != null && ffId.equals(flowFile.getId())) { |
| processedItr.remove(); |
| beingProcessed.remove(ffId); |
| removedFlowFiles.add(flowFile.getId()); |
| currentVersions.remove(ffId); |
| if (originalVersions.get(flowFile.getId()) != null) { |
| provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key())); |
| } |
| return; |
| } |
| } |
| |
| throw new ProcessException(flowFile + " not found in queue"); |
| } |
| |
| @Override |
| public void remove(Collection<FlowFile> flowFiles) { |
| flowFiles = validateState(flowFiles); |
| |
| for (final FlowFile flowFile : flowFiles) { |
| remove(flowFile); |
| } |
| } |
| |
| @Override |
| public MockFlowFile removeAllAttributes(FlowFile flowFile, final Set<String> attrNames) { |
| flowFile = validateState(flowFile); |
| if (attrNames == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| newFlowFile.removeAttributes(attrNames); |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) { |
| flowFile = validateState(flowFile); |
| if (flowFile == null) { |
| throw new IllegalArgumentException("flowFile cannot be null"); |
| } |
| if (keyPattern == null) { |
| return (MockFlowFile) flowFile; |
| } |
| |
| final Set<String> attrsToRemove = new HashSet<>(); |
| for (final String key : flowFile.getAttributes().keySet()) { |
| if (keyPattern.matcher(key).matches()) { |
| attrsToRemove.add(key); |
| } |
| } |
| |
| return removeAllAttributes(flowFile, attrsToRemove); |
| } |
| |
| @Override |
| public MockFlowFile removeAttribute(FlowFile flowFile, final String attrName) { |
| flowFile = validateState(flowFile); |
| if (attrName == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| final Set<String> attrNames = new HashSet<>(); |
| attrNames.add(attrName); |
| newFlowFile.removeAttributes(attrNames); |
| return newFlowFile; |
| } |
| |
| @Override |
| public void rollback() { |
| rollback(false); |
| } |
| |
| @Override |
| public void rollback(final boolean penalize) { |
| //if we've already committed then rollback is basically a no-op |
| if(committed){ |
| return; |
| } |
| |
| closeStreams(openInputStreams, false); |
| closeStreams(openOutputStreams, false); |
| |
| for (final List<MockFlowFile> list : transferMap.values()) { |
| for (final MockFlowFile flowFile : list) { |
| processorQueue.offer(flowFile); |
| if (penalize) { |
| penalized.add(flowFile); |
| } |
| } |
| } |
| |
| for (final Long flowFileId : beingProcessed) { |
| final MockFlowFile flowFile = originalVersions.get(flowFileId); |
| if (flowFile != null) { |
| processorQueue.offer(flowFile); |
| if (penalize) { |
| penalized.add(flowFile); |
| } |
| } |
| } |
| |
| rolledback = true; |
| beingProcessed.clear(); |
| currentVersions.clear(); |
| originalVersions.clear(); |
| transferMap.clear(); |
| clearTransferState(); |
| if (!penalize) { |
| penalized.clear(); |
| } |
| } |
| |
| @Override |
| public void transfer(FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("I only accept MockFlowFile"); |
| } |
| |
| // if the flowfile provided was created in this session (i.e. it's in currentVersions and not in original versions), |
| // then throw an exception indicating that you can't transfer flowfiles back to self. |
| // this mimics the same behavior in StandardProcessSession |
| if(currentVersions.get(flowFile.getId()) != null && originalVersions.get(flowFile.getId()) == null) { |
| throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self"); |
| } |
| |
| final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; |
| beingProcessed.remove(flowFile.getId()); |
| processorQueue.offer(mockFlowFile); |
| updateLastQueuedDate(mockFlowFile); |
| |
| } |
| |
| private void updateLastQueuedDate(MockFlowFile mockFlowFile) { |
| // Simulate StandardProcessSession.updateLastQueuedDate, |
| // which is called when a flow file is transferred to a relationship. |
| mockFlowFile.setLastEnqueuedDate(System.currentTimeMillis()); |
| mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet()); |
| } |
| |
| @Override |
| public void transfer(final Collection<FlowFile> flowFiles) { |
| for (final FlowFile flowFile : flowFiles) { |
| transfer(flowFile); |
| } |
| } |
| |
| @Override |
| public void transfer(FlowFile flowFile, final Relationship relationship) { |
| if (relationship == Relationship.SELF) { |
| transfer(flowFile); |
| return; |
| } |
| if(!processor.getRelationships().contains(relationship)){ |
| throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known"); |
| } |
| |
| flowFile = validateState(flowFile); |
| List<MockFlowFile> list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>()); |
| |
| beingProcessed.remove(flowFile.getId()); |
| list.add((MockFlowFile) flowFile); |
| updateLastQueuedDate((MockFlowFile) flowFile); |
| } |
| |
| @Override |
| public void transfer(Collection<FlowFile> flowFiles, final Relationship relationship) { |
| if (relationship == Relationship.SELF) { |
| transfer(flowFiles); |
| return; |
| } |
| for (final FlowFile flowFile : flowFiles) { |
| transfer(flowFile, relationship); |
| } |
| } |
| |
| @Override |
| public MockFlowFile write(FlowFile flowFile, final OutputStreamCallback callback) { |
| flowFile = validateState(flowFile); |
| if (callback == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| final MockFlowFile mock = (MockFlowFile) flowFile; |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| writeRecursionSet.add(flowFile); |
| try { |
| callback.process(baos); |
| } catch (final IOException e) { |
| throw new ProcessException(e.toString(), e); |
| } finally { |
| writeRecursionSet.remove(flowFile); |
| } |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| newFlowFile.setData(baos.toByteArray()); |
| return newFlowFile; |
| } |
| |
| @Override |
| public OutputStream write(FlowFile flowFile) { |
| if (!(flowFile instanceof MockFlowFile)) { |
| throw new IllegalArgumentException("Cannot export a flow file that I did not create"); |
| } |
| |
| final MockFlowFile mockFlowFile = validateState(flowFile); |
| writeRecursionSet.add(flowFile); |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream() { |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| |
| writeRecursionSet.remove(mockFlowFile); |
| final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| newFlowFile.setData(toByteArray()); |
| } |
| }; |
| |
| return baos; |
| } |
| |
| @Override |
| public FlowFile append(FlowFile flowFile, final OutputStreamCallback callback) { |
| if (callback == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| final MockFlowFile mock = validateState(flowFile); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try { |
| baos.write(mock.getData()); |
| callback.process(baos); |
| } catch (final IOException e) { |
| throw new ProcessException(e.toString(), e); |
| } |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| newFlowFile.setData(baos.toByteArray()); |
| return newFlowFile; |
| } |
| |
| @Override |
| public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) { |
| if (callback == null || flowFile == null) { |
| throw new IllegalArgumentException("argument cannot be null"); |
| } |
| final MockFlowFile mock = validateState(flowFile); |
| |
| final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData()); |
| final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| |
| writeRecursionSet.add(flowFile); |
| try { |
| callback.process(in, out); |
| } catch (final IOException e) { |
| throw new ProcessException(e.toString(), e); |
| } finally { |
| writeRecursionSet.remove(flowFile); |
| } |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| newFlowFile.setData(out.toByteArray()); |
| |
| return newFlowFile; |
| } |
| |
| private byte[] readFully(final InputStream in) throws IOException { |
| final byte[] buffer = new byte[4096]; |
| int len; |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| while ((len = in.read(buffer)) >= 0) { |
| baos.write(buffer, 0, len); |
| } |
| |
| return baos.toByteArray(); |
| } |
| |
| public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) { |
| List<MockFlowFile> list = this.transferMap.get(relationship); |
| if (list == null) { |
| list = new ArrayList<>(); |
| } |
| |
| return list; |
| } |
| |
| public List<MockFlowFile> getPenalizedFlowFiles() { |
| return penalized; |
| } |
| |
| /** |
| * @param relationship to get flowfiles for |
| * @return a List of FlowFiles in the order in which they were transferred |
| * to the given relationship |
| */ |
| public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { |
| final Relationship procRel = new Relationship.Builder().name(relationship).build(); |
| return getFlowFilesForRelationship(procRel); |
| } |
| |
| public MockFlowFile createFlowFile(final File file) throws IOException { |
| return createFlowFile(Files.readAllBytes(file.toPath())); |
| } |
| |
| public MockFlowFile createFlowFile(final byte[] data) { |
| final MockFlowFile flowFile = create(); |
| flowFile.setData(data); |
| return flowFile; |
| } |
| |
| public MockFlowFile createFlowFile(final byte[] data, final Map<String, String> attrs) { |
| final MockFlowFile ff = createFlowFile(data); |
| ff.putAttributes(attrs); |
| return ff; |
| } |
| |
| @Override |
| public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) { |
| sources = validateState(sources); |
| destination = validateState(destination); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try { |
| if (header != null) { |
| baos.write(header); |
| } |
| |
| int count = 0; |
| for (final FlowFile flowFile : sources) { |
| baos.write(((MockFlowFile) flowFile).getData()); |
| if (demarcator != null && ++count != sources.size()) { |
| baos.write(demarcator); |
| } |
| } |
| |
| if (footer != null) { |
| baos.write(footer); |
| } |
| } catch (final IOException e) { |
| throw new AssertionError("failed to write data to BAOS"); |
| } |
| |
| final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); |
| newFlowFile.setData(baos.toByteArray()); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| |
| return newFlowFile; |
| } |
| |
| private List<FlowFile> validateState(final Collection<FlowFile> flowFiles) { |
| return flowFiles.stream() |
| .map(ff -> validateState(ff)) |
| .collect(Collectors.toList()); |
| } |
| |
| private MockFlowFile validateState(final FlowFile flowFile) { |
| Objects.requireNonNull(flowFile); |
| |
| final MockFlowFile currentVersion = currentVersions.get(flowFile.getId()); |
| if (currentVersion == null) { |
| throw new FlowFileHandlingException(flowFile + " is not known in this session"); |
| } |
| |
| if (readRecursionSet.containsKey(flowFile)) { |
| throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); |
| } |
| |
| if (writeRecursionSet.contains(flowFile)) { |
| throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed"); |
| } |
| |
| for (final List<MockFlowFile> flowFiles : transferMap.values()) { |
| if (flowFiles.contains(flowFile)) { |
| throw new IllegalStateException(flowFile + " has already been transferred"); |
| } |
| } |
| |
| return currentVersion; |
| } |
| |
| |
| /** |
| * Inherits the attributes from the given source flow file into another flow |
| * file. The UUID of the source becomes the parent UUID of this flow file. |
| * If a parent uuid had previously been established it will be replaced by |
| * the uuid of the given source |
| * |
| * @param source the FlowFile from which to copy attributes |
| * @param destination the FlowFile to which to copy attributes |
| */ |
| private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) { |
| if (source == null || destination == null || source == destination) { |
| return destination; // don't need to inherit from ourselves |
| } |
| final FlowFile updated = putAllAttributes(destination, source.getAttributes()); |
| getProvenanceReporter().fork(source, Collections.singletonList(updated)); |
| return updated; |
| } |
| |
| /** |
| * Inherits the attributes from the given source flow files into the |
| * destination flow file. The UUIDs of the sources becomes the parent UUIDs |
| * of the destination flow file. Only attributes which is common to all |
| * source items is copied into this flow files attributes. Any previously |
| * established parent UUIDs will be replaced by the UUIDs of the sources. It |
| * will capture the uuid of a certain number of source objects and may not |
| * capture all of them. How many it will capture is unspecified. |
| * |
| * @param sources to inherit common attributes from |
| */ |
| private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) { |
| final StringBuilder parentUuidBuilder = new StringBuilder(); |
| int uuidsCaptured = 0; |
| for (final FlowFile source : sources) { |
| if (source == destination) { |
| continue; // don't want to capture parent uuid of this. Something can't be a child of itself |
| } |
| final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key()); |
| if (sourceUuid != null && !sourceUuid.trim().isEmpty()) { |
| uuidsCaptured++; |
| if (parentUuidBuilder.length() > 0) { |
| parentUuidBuilder.append(","); |
| } |
| parentUuidBuilder.append(sourceUuid); |
| } |
| |
| if (uuidsCaptured > 100) { |
| break; |
| } |
| } |
| |
| final FlowFile updated = putAllAttributes(destination, intersectAttributes(sources)); |
| getProvenanceReporter().join(sources, updated); |
| return updated; |
| } |
| |
| /** |
| * Returns the attributes that are common to every flow file given. The key |
| * and value must match exactly. |
| * |
| * @param flowFileList a list of flow files |
| * |
| * @return the common attributes |
| */ |
| private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { |
| final Map<String, String> result = new HashMap<>(); |
| // trivial cases |
| if (flowFileList == null || flowFileList.isEmpty()) { |
| return result; |
| } else if (flowFileList.size() == 1) { |
| result.putAll(flowFileList.iterator().next().getAttributes()); |
| } |
| |
| /* |
| * Start with the first attribute map and only put an entry to the |
| * resultant map if it is common to every map. |
| */ |
| final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); |
| |
| outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { |
| final String key = mapEntry.getKey(); |
| final String value = mapEntry.getValue(); |
| for (final FlowFile flowFile : flowFileList) { |
| final Map<String, String> currMap = flowFile.getAttributes(); |
| final String curVal = currMap.get(key); |
| if (curVal == null || !curVal.equals(value)) { |
| continue outer; |
| } |
| } |
| result.put(key, value); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Assert that the session has been committed |
| */ |
| public void assertCommitted() { |
| Assertions.assertTrue(committed, "Session was not committed"); |
| } |
| |
| /** |
| * Assert that the session has not been committed |
| */ |
| public void assertNotCommitted() { |
| Assertions.assertFalse(committed, "Session was committed"); |
| } |
| |
| /** |
| * Assert that {@link #rollback()} has been called |
| */ |
| public void assertRolledBack() { |
| Assertions.assertTrue(rolledback, "Session was not rolled back"); |
| } |
| |
| /** |
| * Assert that {@link #rollback()} has not been called |
| */ |
| public void assertNotRolledBack() { |
| Assertions.assertFalse(rolledback, "Session was rolled back"); |
| } |
| |
| /** |
| * Assert that the number of FlowFiles transferred to the given relationship |
| * is equal to the given count |
| * |
| * @param relationship to validate transfer count of |
| * @param count items transfer to given relationship |
| */ |
| public void assertTransferCount(final Relationship relationship, final int count) { |
| final int transferCount = getFlowFilesForRelationship(relationship).size(); |
| Assertions.assertEquals(count, transferCount, "Expected " + count + " FlowFiles to be transferred to " |
| + relationship + " but actual transfer count was " + transferCount); |
| } |
| |
| /** |
| * Assert that the number of FlowFiles transferred to the given relationship |
| * is equal to the given count |
| * |
| * @param relationship to validate transfer count of |
| * @param count items transfer to given relationship |
| */ |
| public void assertTransferCount(final String relationship, final int count) { |
| assertTransferCount(new Relationship.Builder().name(relationship).build(), count); |
| } |
| |
| /** |
| * Assert that there are no FlowFiles left on the input queue. |
| */ |
| public void assertQueueEmpty() { |
| Assertions.assertTrue(this.processorQueue.isEmpty(), "FlowFile Queue has " + this.processorQueue.size() + " items"); |
| } |
| |
| /** |
| * Assert that at least one FlowFile is on the input queue |
| */ |
| public void assertQueueNotEmpty() { |
| Assertions.assertFalse(this.processorQueue.isEmpty(), "FlowFile Queue is empty"); |
| } |
| |
| /** |
| * Asserts that all FlowFiles that were transferred were transferred to the |
| * given relationship |
| * |
| * @param relationship to check for transferred flow files |
| */ |
| public void assertAllFlowFilesTransferred(final String relationship) { |
| assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build()); |
| } |
| |
| /** |
| * Asserts that all FlowFiles that were transferred were transferred to the |
| * given relationship |
| * |
| * @param relationship to validate |
| */ |
| public void assertAllFlowFilesTransferred(final Relationship relationship) { |
| for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { |
| final Relationship rel = entry.getKey(); |
| final List<MockFlowFile> flowFiles = entry.getValue(); |
| |
| if (!rel.equals(relationship) && flowFiles != null && !flowFiles.isEmpty()) { |
| Assertions.fail("Expected all Transferred FlowFiles to go to " + relationship + " but " + flowFiles.size() + " were routed to " + rel); |
| } |
| } |
| } |
| |
| /** |
| * Asserts that all FlowFiles that were transferred are compliant with the |
| * given validator. |
| * |
| * @param validator validator to use |
| */ |
| public void assertAllFlowFiles(FlowFileValidator validator) { |
| for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { |
| final List<MockFlowFile> flowFiles = entry.getValue(); |
| for (MockFlowFile mockFlowFile : flowFiles) { |
| validator.assertFlowFile(mockFlowFile); |
| } |
| } |
| } |
| |
| /** |
| * Asserts that all FlowFiles that were transferred in the given relationship |
| * are compliant with the given validator. |
| * |
| * @param validator validator to use |
| */ |
| public void assertAllFlowFiles(Relationship relationship, FlowFileValidator validator) { |
| for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { |
| final List<MockFlowFile> flowFiles = entry.getValue(); |
| final Relationship rel = entry.getKey(); |
| for (MockFlowFile mockFlowFile : flowFiles) { |
| if(rel.equals(relationship)) { |
| validator.assertFlowFile(mockFlowFile); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Removes all state information about FlowFiles that have been transferred |
| */ |
| public void clearTransferState() { |
| this.transferMap.clear(); |
| } |
| |
| /** |
| * Asserts that all FlowFiles that were transferred were transferred to the |
| * given relationship and that the number of FlowFiles transferred is equal |
| * to <code>count</code> |
| * |
| * @param relationship to validate |
| * @param count number of items sent to that relationship (expected) |
| */ |
| public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { |
| assertAllFlowFilesTransferred(relationship); |
| assertTransferCount(relationship, count); |
| } |
| |
| /** |
| * Asserts that all FlowFiles that were transferred were transferred to the |
| * given relationship and that the number of FlowFiles transferred is equal |
| * to <code>count</code> |
| * |
| * @param relationship to validate |
| * @param count number of items sent to that relationship (expected) |
| */ |
| public void assertAllFlowFilesTransferred(final String relationship, final int count) { |
| assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count); |
| } |
| |
| /** |
| * @return the number of FlowFiles that were removed |
| */ |
| public int getRemovedCount() { |
| return removedFlowFiles.size(); |
| } |
| |
| @Override |
| public ProvenanceReporter getProvenanceReporter() { |
| return provenanceReporter; |
| } |
| |
| @Override |
| public void setState(final Map<String, String> state, final Scope scope) throws IOException { |
| stateManager.setState(state, scope); |
| } |
| |
| @Override |
| public StateMap getState(final Scope scope) throws IOException { |
| return stateManager.getState(scope); |
| } |
| |
| @Override |
| public boolean replaceState(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException { |
| return stateManager.replace(oldValue, newValue, scope); |
| } |
| |
| @Override |
| public void clearState(final Scope scope) throws IOException { |
| stateManager.clear(scope); |
| } |
| |
| @Override |
| public MockFlowFile penalize(FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; |
| final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| newFlowFile.setPenalized(true); |
| penalized.add(newFlowFile); |
| return newFlowFile; |
| } |
| |
| public MockFlowFile unpenalize(FlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| final MockFlowFile newFlowFile = new MockFlowFile(flowFile.getId(), flowFile); |
| currentVersions.put(newFlowFile.getId(), newFlowFile); |
| newFlowFile.setPenalized(false); |
| penalized.remove(newFlowFile); |
| return newFlowFile; |
| } |
| |
| public byte[] getContentAsByteArray(MockFlowFile flowFile) { |
| flowFile = validateState(flowFile); |
| return flowFile.getData(); |
| } |
| |
| /** |
| * Checks if a FlowFile is known in this session. |
| * |
| * @param flowFile |
| * the FlowFile to check |
| * @return <code>true</code> if the FlowFile is known in this session, |
| * <code>false</code> otherwise. |
| */ |
| boolean isFlowFileKnown(final FlowFile flowFile) { |
| final FlowFile curFlowFile = currentVersions.get(flowFile.getId()); |
| if (curFlowFile == null) { |
| return false; |
| } |
| |
| final String curUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key()); |
| final String providedUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key()); |
| return curUuid.equals(providedUuid); |
| } |
| } |