NIFI-938: If ResourceClaim is removed while a process has access to its stream, don't delete the claim
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index d06b462..724e26e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -108,6 +108,7 @@
// the OutputStream that we can use for writing to the claim.
private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
+ private final Set<ResourceClaim> activeResourceClaims = Collections.synchronizedSet(new HashSet<ResourceClaim>());
private final boolean archiveData;
private final long maxArchiveMillis;
@@ -600,11 +601,15 @@
// two conditions can be checked atomically.
synchronized (writableClaimQueue) {
final int claimantCount = resourceClaimManager.getClaimantCount(claim);
- if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+ if (claimantCount > 0) {
// if other content claims are claiming the same resource, we have nothing to destroy,
// so just consider the destruction successful.
return true;
}
+ if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
+ // If we have an open OutputStream for the claim, we will not destroy the claim.
+ return false;
+ }
}
Path path = null;
@@ -827,6 +832,8 @@
throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
}
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+
// we always append because there may be another ContentClaim using the same resource claim.
// However, we know that we will never write to the same claim from two different threads
// at the same time because we will call create() to get the claim before we write to it,
@@ -847,6 +854,7 @@
}
}
+ activeResourceClaims.add(resourceClaim);
final ByteCountingOutputStream bcos = claimStream;
final OutputStream out = new OutputStream() {
private long bytesWritten = 0L;
@@ -921,6 +929,7 @@
@Override
public synchronized void close() throws IOException {
closed = true;
+ activeResourceClaims.remove(resourceClaim);
if (alwaysSync) {
((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 5ffcb3d..88f572b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -348,6 +348,29 @@
}
@Test
+ public void testRemoveWhileWritingToClaim() throws IOException {
+ final ContentClaim claim = repository.create(false);
+ final OutputStream out = repository.write(claim);
+
+ // write at least 1 MB to the output stream so that when we close the output stream
+ // the repo won't keep the stream open.
+ final byte[] buff = new byte[1024 * 1024];
+ out.write(buff);
+ out.write(buff);
+
+ // true because claimant count is still 1.
+ assertTrue(repository.remove(claim));
+
+ assertEquals(0, repository.decrementClaimantCount(claim));
+
+ // false because claimant count is 0 but there is an 'active' stream for the claim
+ assertFalse(repository.remove(claim));
+
+ out.close();
+ assertTrue(repository.remove(claim));
+ }
+
+ @Test
public void testMergeWithHeaderFooterDemarcator() throws IOException {
testMerge("HEADER", "FOOTER", "DEMARCATOR");
}