NIFI-938: If ResourceClaim is removed while a process has access to its stream, don't delete the claim
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index d06b462..724e26e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -108,6 +108,7 @@
     // the OutputStream that we can use for writing to the claim.
     private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
     private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
+    private final Set<ResourceClaim> activeResourceClaims = Collections.synchronizedSet(new HashSet<ResourceClaim>());
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
@@ -600,11 +601,15 @@
         // two conditions can be checked atomically.
         synchronized (writableClaimQueue) {
             final int claimantCount = resourceClaimManager.getClaimantCount(claim);
-            if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+            if (claimantCount > 0) {
                 // if other content claims are claiming the same resource, we have nothing to destroy,
                 // so just consider the destruction successful.
                 return true;
             }
+            if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+                // If we have an open OutputStream for the claim, we will not destroy the claim.
+                return false;
+            }
         }
 
         Path path = null;
@@ -827,6 +832,8 @@
             throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
         }
 
+        final ResourceClaim resourceClaim = claim.getResourceClaim();
+
         // we always append because there may be another ContentClaim using the same resource claim.
         // However, we know that we will never write to the same claim from two different threads
         // at the same time because we will call create() to get the claim before we write to it,
@@ -847,6 +854,7 @@
             }
         }
 
+        activeResourceClaims.add(resourceClaim);
         final ByteCountingOutputStream bcos = claimStream;
         final OutputStream out = new OutputStream() {
             private long bytesWritten = 0L;
@@ -921,6 +929,7 @@
             @Override
             public synchronized void close() throws IOException {
                 closed = true;
+                activeResourceClaims.remove(resourceClaim);
 
                 if (alwaysSync) {
                     ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 5ffcb3d..88f572b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -348,6 +348,29 @@
     }
 
     @Test
+    public void testRemoveWhileWritingToClaim() throws IOException {
+        final ContentClaim claim = repository.create(false);
+        final OutputStream out = repository.write(claim);
+
+        // write at least 1 MB to the output stream so that when we close the output stream
+        // the repo won't keep the stream open.
+        final byte[] buff = new byte[1024 * 1024];
+        out.write(buff);
+        out.write(buff);
+
+        // true because claimant count is still 1.
+        assertTrue(repository.remove(claim));
+
+        assertEquals(0, repository.decrementClaimantCount(claim));
+
+        // false because claimant count is 0 but there is an 'active' stream for the claim
+        assertFalse(repository.remove(claim));
+
+        out.close();
+        assertTrue(repository.remove(claim));
+    }
+
+    @Test
     public void testMergeWithHeaderFooterDemarcator() throws IOException {
         testMerge("HEADER", "FOOTER", "DEMARCATOR");
     }