MINIFICPP-1273 - Drain connections on flow shutdown
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #827
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 700b95f..3479070 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -766,6 +766,8 @@
## Add KeyValueStorageService tests
registerTest("${TEST_DIR}/keyvalue-tests")
+registerTest("${TEST_DIR}/flow-tests")
+
include(BuildDocs)
include(DockerConfig)
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 64a805e..5a40d87 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -168,7 +168,7 @@
// Poll the flow file from queue, the expired flow file record also being returned
std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
// Drain the flow records
- void drain();
+ void drain(bool delete_permanently);
void yield() override {}
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index fcbb78a..9944658 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -228,6 +228,8 @@
void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap);
+ void drainConnections();
+
protected:
void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 9038e91..06cfcce 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -262,15 +262,17 @@
return NULL;
}
-void Connection::drain() {
+void Connection::drain(bool delete_permanently) {
std::lock_guard<std::mutex> lock(mutex_);
while (!queue_.empty()) {
std::shared_ptr<core::FlowFile> item = queue_.front();
queue_.pop();
- logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
- if (flow_repository_->Delete(item->getUUIDStr())) {
- item->setStoredToRepository(false);
+ logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_);
+ if (delete_permanently) {
+ if (flow_repository_->Delete(item->getUUIDStr())) {
+ item->setStoredToRepository(false);
+ }
}
}
queued_data_size_ = 0;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 3976936..669285b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -250,6 +250,9 @@
* -Stopping the schedulers doesn't actually quit the onTrigger functions of processors
* -They only guarantee that the processors are not scheduled any more
* -After the threadpool is stopped we can make sure that processors don't need repos and controllers anymore */
+ if (this->root_) {
+ this->root_->drainConnections();
+ }
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
// stop the ControllerServices
@@ -923,7 +926,7 @@
auto conn = connections.find(connection);
if (conn != connections.end()) {
logger_->log_info("Clearing connection %s", connection);
- conn->second->drain();
+ conn->second->drain(true);
}
}
return -1;
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 9eec44c..ee88d79 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -92,13 +92,12 @@
onScheduleTimer_->stop();
}
- for (auto &&connection : connections_) {
- connection->drain();
+ for (auto&& connection : connections_) {
+ connection->drain(false);
}
- for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
- ProcessGroup *processGroup(*it);
- delete processGroup;
+ for (ProcessGroup* childGroup : child_process_groups_) {
+ delete childGroup;
}
}
@@ -403,6 +402,16 @@
}
}
+void ProcessGroup::drainConnections() {
+ for (auto&& connection : connections_) {
+ connection->drain(false);
+ }
+
+ for (ProcessGroup* childGroup : child_process_groups_) {
+ childGroup->drainConnections();
+ }
+}
+
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/test/flow-tests/CMakeLists.txt b/libminifi/test/flow-tests/CMakeLists.txt
new file mode 100644
index 0000000..a17cf23
--- /dev/null
+++ b/libminifi/test/flow-tests/CMakeLists.txt
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB FLOW_TESTS "*.cpp")
+SET(FLOW_TEST_COUNT 0)
+FOREACH(testfile ${FLOW_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ createTests("${testfilename}")
+ target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ target_wholearchive_library(${testfilename} minifi-standard-processors)
+ MATH(EXPR FLOW_TEST_COUNT "${FLOW_TEST_COUNT}+1")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${FLOW_TEST_COUNT} flow related test file(s)...")
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
new file mode 100644
index 0000000..eb444f6
--- /dev/null
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -0,0 +1,125 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "provenance/Provenance.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "YamlConfiguration.h"
+
+const char* yamlConfig =
+R"(
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: Generator
+ id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ class: org.apache.nifi.processors.standard.GenerateFlowFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 100 ms
+ penalization period: 300 ms
+ yield period: 100 ms
+ run duration nanos: 0
+ auto-terminated relationships list:
+ Properties:
+ Batch Size: 10
+ - name: LogAttribute
+ id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ class: org.apache.nifi.processors.standard.LogAttribute
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1000 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+
+Connections:
+ - name: Gen
+ id: 2438e3c8-015a-1000-79ca-83af40ec1997
+ source name: Generator
+ source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+ source relationship name: success
+ destination name: LogAttribute
+ destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+ max work queue size: 0
+ max work queue data size: 1 MB
+ flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
+)";
+
+TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
+ TestController testController;
+ char format[] = "/tmp/flowTest.XXXXXX";
+ std::string dir = testController.createTempDirectory(format);
+
+ std::string yamlPath = utils::file::FileUtils::concat_path(dir, "config.yml");
+ std::ofstream{yamlPath} << yamlConfig;
+
+ std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> ff_repo = std::make_shared<TestFlowRepository>();
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, yamlPath);
+
+ REQUIRE(content_repo->initialize(configuration));
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+
+ std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+ std::unique_ptr<core::FlowConfiguration> flow = utils::make_unique<core::YamlConfiguration>(prov_repo, ff_repo, content_repo, stream_factory, configuration, yamlPath);
+ std::shared_ptr<core::ProcessGroup> root = flow->getRoot();
+ std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
+ prov_repo, ff_repo, configuration,
+ std::move(flow),
+ content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+
+
+ root->getConnections(connectionMap);
+ // adds the single connection to the map both by name and id
+ REQUIRE(connectionMap.size() == 2);
+ controller->load(root);
+ controller->start();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds{1000});
+
+ for (auto& it : connectionMap) {
+ REQUIRE(it.second->getQueueSize() > 10);
+ }
+
+ controller->stop(true);
+
+ for (auto& it : connectionMap) {
+ REQUIRE(it.second->isEmpty());
+ }
+}