MINIFICPP-2692 Fix flow file removal for volatile repositories

Closes #2076

Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py
index 805b042..b2cfeae 100644
--- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py
+++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py
@@ -15,6 +15,7 @@
 #  limitations under the License.
 #
 
+import logging
 from docker.models.networks import Network
 
 from minifi_test_framework.containers.file import File
@@ -82,3 +83,11 @@
     def _get_log_properties_file_content(self):
         lines = (f"{key}={value}" for key, value in self.log_properties.items())
         return "\n".join(lines)
+
+    def get_memory_usage(self) -> int | None:
+        exit_code, output = self.exec_run(["awk", "/VmRSS/ { printf \"%d\\n\", $2 }", "/proc/1/status"])
+        if exit_code != 0:
+            return None
+        memory_usage_in_bytes = int(output.strip()) * 1024
+        logging.info(f"MiNiFi memory usage: {memory_usage_in_bytes} bytes")
+        return memory_usage_in_bytes
diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
index 231a2be..418c49c 100644
--- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
@@ -142,3 +142,13 @@
     assert wait_for_condition(
         condition=lambda: context.get_default_minifi_container().verify_path_with_json_content(directory, content),
         timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context)
+
+
+@then('MiNiFi\'s memory usage does not increase by more than {max_increase} after {duration}')
+def step_impl(context: MinifiTestContext, max_increase: str, duration: str):
+    time_in_seconds = humanfriendly.parse_timespan(duration)
+    max_increase_in_bytes = humanfriendly.parse_size(max_increase)
+    initial_memory_usage = context.get_default_minifi_container().get_memory_usage()
+    time.sleep(time_in_seconds)
+    final_memory_usage = context.get_default_minifi_container().get_memory_usage()
+    assert final_memory_usage - initial_memory_usage <= max_increase_in_bytes
diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py
index 552ea2d..343e622 100644
--- a/behave_framework/src/minifi_test_framework/steps/core_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py
@@ -19,6 +19,7 @@
 import random
 import string
 import os
+import time
 
 import humanfriendly
 from behave import when, step, given
@@ -66,3 +67,9 @@
 @given('a host resource file "{filename}" is bound to the "{container_path}" path in the MiNiFi container')
 def step_impl(context: MinifiTestContext, filename: str, container_path: str):
     context.execute_steps(f"given a host resource file \"{filename}\" is bound to the \"{container_path}\" path in the MiNiFi container \"{DEFAULT_MINIFI_CONTAINER_NAME}\"")
+
+
+@step("after {duration} have passed")
+@step("after {duration} has passed")
+def step_impl(context, duration):
+    time.sleep(humanfriendly.parse_timespan(duration))
diff --git a/extensions/standard-processors/tests/features/repository.feature b/extensions/standard-processors/tests/features/repository.feature
new file mode 100644
index 0000000..5cc8c69
--- /dev/null
+++ b/extensions/standard-processors/tests/features/repository.feature
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@CORE
+Feature: Flow file and content repositories work as expected
+
+  Scenario: Flow file content is removed from memory when terminated when using Volatile Content Repository
+    Given a GenerateFlowFile processor with the "File Size" property set to "20 MB"
+    And the scheduling period of the GenerateFlowFile processor is set to "1 sec"
+    And a LogAttribute processor
+    And LogAttribute is EVENT_DRIVEN
+    And the "success" relationship of the GenerateFlowFile processor is connected to the LogAttribute
+    And LogAttribute's success relationship is auto-terminated
+    And MiNiFi configuration "nifi.content.repository.class.name" is set to "VolatileContentRepository"
+    And MiNiFi configuration "nifi.flowfile.repository.class.name" is set to "NoOpRepository"
+
+    When the MiNiFi instance starts up
+    And after 5 seconds have passed
+
+    Then MiNiFi's memory usage does not increase by more than 50 MB after 30 seconds
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1eeb5a0..a0d7bf8 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -1038,10 +1038,15 @@
     std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap,
     const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) {
 
-  std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
-
   auto flowFileRepo = process_context_->getFlowFileRepository();
-  auto contentRepo = process_context_->getContentRepository();
+
+  // In case of a noop repository we do not persist anything, flow files are only stored in memory, so we do not need to adjust the owned count
+  // Otherwise the increase of the owned count would result in memory leaks, as the count is not decreased later in the noop repository
+  if (flowFileRepo->isNoop()) {
+    return;
+  }
+
+  std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
 
   enum class Type {
     Dropped, Transferred