NIFI-8727: This closes #5418. Addressed bug in which ProcessSession doesn't properly decrement claimant count when a FlowFile is cloned and then the clone written to. Added automated tests to ensure that we are properly handling cases where a FlowFile is clone and then the contents modified
Signed-off-by: Joe Witt <joewitt@apache.org>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a17c06c..eba58ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -3075,7 +3075,7 @@
// If the content of the FlowFile has already been modified, we need to remove the newly created content (the working claim). However, if
// they are the same, we cannot just remove the claim because record.getWorkingClaim() will return
// the original claim if the record is "working" but the content has not been modified
- // (e.g., in the case of attributes only were updated)
+ // (e.g., in the case of attributes only were updated).
//
// In other words:
// If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
@@ -3083,7 +3083,14 @@
// that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
// because we will do that later, in the session.commit() and that would result in decrementing the count for
// the original claim twice.
- if (record.isContentModified()) {
+ //
+ // Additionally, If the Record Type is CREATE, that means any content that exists is a temporary claim.
+ // This will happen, for example, if a FlowFile is created and then written to multiple times or a FlowFile is cloned and then the clone's contents
+ // are overwritten. In such a case, we can confidently decrement the count/remove the claim because either the claim is null (in which case calling
+ // decrementClaimantCount/addTransientClaim will have no effect, or the claim points to the previously written content that is no longer relevant because
+ // the FlowFile's content is being overwritten.
+
+ if (record.isContentModified() || record.getType() == RepositoryRecordType.CREATE) {
// In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
// Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 15f9d46..4e2e145 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -111,6 +111,7 @@
import static org.mockito.Mockito.when;
public class StandardProcessSessionIT {
+ private static final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private StandardProcessSession session;
private MockContentRepository contentRepo;
@@ -123,8 +124,7 @@
private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository;
private FlowFileEventRepository flowFileEventRepository;
- private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
- private static StandardResourceClaimManager resourceClaimManager;
+ private ResourceClaimManager resourceClaimManager;
@After
public void cleanup() {
@@ -2603,6 +2603,205 @@
stateManager.assertStateNotSet();
}
+ @Test
+ public void testCloneThenRollbackCountsClaimReferencesProperly() throws IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+ session.rollback();
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ }
+
+ @Test
+ public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly() throws IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+ clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
+ assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+ session.rollback();
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+ }
+
+ @Test
+ public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly() throws IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+ clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
+ assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+ session.rollback();
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+ }
+
+ @Test
+ public void testCloneThenWriteCountsClaimReferencesProperly() throws IOException {
+ final ContentClaim originalClaim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(originalClaim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(originalClaim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.clone(flowFile);
+
+ // Expect claimant count of 2 because the clone() means that the new FlowFile points to the same content claim.
+ assertEquals(2, contentRepo.getClaimantCount(originalClaim));
+
+ // Should be able to write to the FlowFile any number of times, and each time it should leave us with a Content Claim Claimant Count of 1 for the original (because the new FlowFile will no
+ // longer point at the original claim) and 1 for the new Content Claim.
+ for (int i=0; i < 10; i++) {
+ final ContentClaim previousCloneClaim = getContentClaim(clone);
+ clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+ // After modifying the content of the FlowFile, the claimant count of the 'old' content claim should be 1, as should the claimant count of the updated content claim.
+ final ContentClaim updatedCloneClaim = getContentClaim(clone);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+ assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
+ }
+ }
+
+ private ContentClaim getContentClaim(final FlowFile flowFile) {
+ return ((FlowFileRecord) flowFile).getContentClaim();
+ }
+
+ @Test
+ public void testCreateChildThenWriteCountsClaimReferencesProperly() throws IOException {
+ final ContentClaim claim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(claim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.create(flowFile);
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+ final ContentClaim updatedCloneClaim = getContentClaim(clone);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+ }
+
+ @Test
+ public void testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws IOException {
+ final ContentClaim claim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(claim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+
+ FlowFile clone = session.create(flowFile);
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+
+ for (int i=0; i < 100; i++) {
+ clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+ final ContentClaim updatedCloneClaim = getContentClaim(clone);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ assertEquals(1, contentRepo.getClaimantCount(claim));
+ }
+ }
+
+ @Test
+ public void testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly() {
+ FlowFile flowFile = session.create();
+
+ for (int i=0; i < 100; i++) {
+ flowFile = session.write(flowFile, out -> out.write("bye".getBytes()));
+
+ final ContentClaim updatedCloneClaim = getContentClaim(flowFile);
+ assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+ }
+
+ session.rollback();
+ assertEquals(0, contentRepo.getClaimantCount(getContentClaim(flowFile)));
+ }
+
+
+
private static class MockFlowFileRepository implements FlowFileRepository {
private boolean failOnUpdate = false;