MINIFICPP-2692 Fix flow file removal for volatile repositories
Closes #2076
Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py
index 805b042..b2cfeae 100644
--- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py
+++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import logging
from docker.models.networks import Network
from minifi_test_framework.containers.file import File
@@ -82,3 +83,11 @@
def _get_log_properties_file_content(self):
lines = (f"{key}={value}" for key, value in self.log_properties.items())
return "\n".join(lines)
+
+ def get_memory_usage(self) -> int | None:
+ exit_code, output = self.exec_run(["awk", "/VmRSS/ { printf \"%d\\n\", $2 }", "/proc/1/status"])
+ if exit_code != 0:
+ return None
+ memory_usage_in_bytes = int(output.strip()) * 1024
+ logging.info(f"MiNiFi memory usage: {memory_usage_in_bytes} bytes")
+ return memory_usage_in_bytes
diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
index 231a2be..418c49c 100644
--- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
@@ -142,3 +142,13 @@
assert wait_for_condition(
condition=lambda: context.get_default_minifi_container().verify_path_with_json_content(directory, content),
timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context)
+
+
+@then('MiNiFi\'s memory usage does not increase by more than {max_increase} after {duration}')
+def step_impl(context: MinifiTestContext, max_increase: str, duration: str):
+ time_in_seconds = humanfriendly.parse_timespan(duration)
+ max_increase_in_bytes = humanfriendly.parse_size(max_increase)
+ initial_memory_usage = context.get_default_minifi_container().get_memory_usage()
+ time.sleep(time_in_seconds)
+ final_memory_usage = context.get_default_minifi_container().get_memory_usage()
+ assert final_memory_usage - initial_memory_usage <= max_increase_in_bytes
diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py
index 552ea2d..343e622 100644
--- a/behave_framework/src/minifi_test_framework/steps/core_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py
@@ -19,6 +19,7 @@
import random
import string
import os
+import time
import humanfriendly
from behave import when, step, given
@@ -66,3 +67,9 @@
@given('a host resource file "{filename}" is bound to the "{container_path}" path in the MiNiFi container')
def step_impl(context: MinifiTestContext, filename: str, container_path: str):
context.execute_steps(f"given a host resource file \"{filename}\" is bound to the \"{container_path}\" path in the MiNiFi container \"{DEFAULT_MINIFI_CONTAINER_NAME}\"")
+
+
+@step("after {duration} have passed")
+@step("after {duration} has passed")
+def step_impl(context, duration):
+ time.sleep(humanfriendly.parse_timespan(duration))
diff --git a/extensions/standard-processors/tests/features/repository.feature b/extensions/standard-processors/tests/features/repository.feature
new file mode 100644
index 0000000..5cc8c69
--- /dev/null
+++ b/extensions/standard-processors/tests/features/repository.feature
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@CORE
+Feature: Flow file and content repositories work as expected
+
+ Scenario: Flow file content is removed from memory when terminated when using Volatile Content Repository
+ Given a GenerateFlowFile processor with the "File Size" property set to "20 MB"
+ And the scheduling period of the GenerateFlowFile processor is set to "1 sec"
+ And a LogAttribute processor
+ And LogAttribute is EVENT_DRIVEN
+ And the "success" relationship of the GenerateFlowFile processor is connected to the LogAttribute
+ And LogAttribute's success relationship is auto-terminated
+ And MiNiFi configuration "nifi.content.repository.class.name" is set to "VolatileContentRepository"
+ And MiNiFi configuration "nifi.flowfile.repository.class.name" is set to "NoOpRepository"
+
+ When the MiNiFi instance starts up
+ And after 5 seconds have passed
+
+ Then MiNiFi's memory usage does not increase by more than 50 MB after 30 seconds
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1eeb5a0..a0d7bf8 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -1038,10 +1038,15 @@
std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap,
const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) {
- std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
-
auto flowFileRepo = process_context_->getFlowFileRepository();
- auto contentRepo = process_context_->getContentRepository();
+
+ // In case of a noop repository we do not persist anything, flow files are only stored in memory, so we do not need to adjust the owned count
+ // Otherwise the increase of the owned count would result in memory leaks, as the count is not decreased later in the noop repository
+ if (flowFileRepo->isNoop()) {
+ return;
+ }
+
+ std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
enum class Type {
Dropped, Transferred