NIFI-8727: This closes #5418. Addressed bug in which ProcessSession doesn't properly decrement claimant count when a FlowFile is cloned and then the clone written to. Added automated tests to ensure that we are properly handling cases where a FlowFile is clone and then the contents modified

Signed-off-by: Joe Witt <joewitt@apache.org>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a17c06c..eba58ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -3075,7 +3075,7 @@
         // If the content of the FlowFile has already been modified, we need to remove the newly created content (the working claim). However, if
         // they are the same, we cannot just remove the claim because record.getWorkingClaim() will return
         // the original claim if the record is "working" but the content has not been modified
-        // (e.g., in the case of attributes only were updated)
+        // (e.g., in the case of attributes only were updated).
         //
         // In other words:
         // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
@@ -3083,7 +3083,14 @@
         // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
         // because we will do that later, in the session.commit() and that would result in decrementing the count for
         // the original claim twice.
-        if (record.isContentModified()) {
+        //
+        // Additionally, If the Record Type is CREATE, that means any content that exists is a temporary claim.
+        // This will happen, for example, if a FlowFile is created and then written to multiple times or a FlowFile is cloned and then the clone's contents
+        // are overwritten. In such a case, we can confidently decrement the count/remove the claim because either the claim is null (in which case calling
+        // decrementClaimantCount/addTransientClaim will have no effect, or the claim points to the previously written content that is no longer relevant because
+        // the FlowFile's content is being overwritten.
+
+        if (record.isContentModified() || record.getType() == RepositoryRecordType.CREATE) {
             // In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be
             // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
             // Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 15f9d46..4e2e145 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -111,6 +111,7 @@
 import static org.mockito.Mockito.when;
 
 public class StandardProcessSessionIT {
+    private static final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
 
     private StandardProcessSession session;
     private MockContentRepository contentRepo;
@@ -123,8 +124,7 @@
     private MockFlowFileRepository flowFileRepo;
     private CounterRepository counterRepository;
     private FlowFileEventRepository flowFileEventRepository;
-    private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
-    private static StandardResourceClaimManager resourceClaimManager;
+    private ResourceClaimManager resourceClaimManager;
 
     @After
     public void cleanup() {
@@ -2603,6 +2603,205 @@
         stateManager.assertStateNotSet();
     }
 
+    @Test
+    public void testCloneThenRollbackCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+    }
+
+    @Test
+    public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenWriteCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+
+        // Expect claimant count of 2 because the clone() means that the new FlowFile points to the same content claim.
+        assertEquals(2, contentRepo.getClaimantCount(originalClaim));
+
+        // Should be able to write to the FlowFile any number of times, and each time it should leave us with a Content Claim Claimant Count of 1 for the original (because the new FlowFile will no
+        // longer point at the original claim) and 1 for the new Content Claim.
+        for (int i=0; i < 10; i++) {
+            final ContentClaim previousCloneClaim = getContentClaim(clone);
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            // After modifying the content of the FlowFile, the claimant count of the 'old' content claim should be 1, as should the claimant count of the updated content claim.
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+            assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
+        }
+    }
+
+    private ContentClaim getContentClaim(final FlowFile flowFile) {
+        return ((FlowFileRecord) flowFile).getContentClaim();
+    }
+
+    @Test
+    public void testCreateChildThenWriteCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+        final ContentClaim updatedCloneClaim = getContentClaim(clone);
+        assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+    }
+
+    @Test
+    public void testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        for (int i=0; i < 100; i++) {
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(claim));
+        }
+    }
+
+    @Test
+    public void testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly() {
+        FlowFile flowFile = session.create();
+
+        for (int i=0; i < 100; i++) {
+            flowFile = session.write(flowFile, out -> out.write("bye".getBytes()));
+
+            final ContentClaim updatedCloneClaim = getContentClaim(flowFile);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+        }
+
+        session.rollback();
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(flowFile)));
+    }
+
+
+
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private boolean failOnUpdate = false;