blob: 1acf833a5a9096d18ff312d405c522dbddb0fb92 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
#include "core/RepositoryFactory.h"
#include "FlowFileRecord.h"
#include "provenance/Provenance.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "CustomProcessors.h"
#include "unit/TestControllerWithFlow.h"
#include "unit/EmptyFlow.h"
#include "unit/TestUtils.h"
using namespace std::literals::chrono_literals;
const char* yamlConfig =
R"(
Flow Controller:
name: MiNiFi Flow
id: 2438e3c8-015a-1000-79ca-83af40ec1990
Processors:
- name: Generator
id: 2438e3c8-015a-1000-79ca-83af40ec1991
class: org.apache.nifi.processors.standard.TestFlowFileGenerator
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 100 ms
penalization period: 300 ms
yield period: 100 ms
run duration nanos: 0
auto-terminated relationships list:
Properties:
Batch Size: 3
- name: TestProcessor
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.TestProcessor
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 100 ms
penalization period: 3 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list:
- apple
- banana
Connections:
- name: Gen
id: 2438e3c8-015a-1000-79ca-83af40ec1997
source name: Generator
source id: 2438e3c8-015a-1000-79ca-83af40ec1991
source relationship name: success
destination name: TestProcessor
destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
max work queue size: 0
max work queue data size: 1 MB
flowfile expiration: 60 sec
Remote Processing Groups:
Controller Services:
- name: defaultstatestorage
id: 2438e3c8-015a-1000-79ca-83af40ec1995
class: PersistentMapStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
File:
- value: flowcontrollertests_state.txt
)";
template<typename Fn>
bool verifyWithBusyWait(std::chrono::milliseconds timeout, Fn&& fn) {
auto start = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start < timeout) {
if (fn()) {
return true;
}
}
return false;
}
TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "100 ms");
TypedProcessorWrapper<minifi::processors::TestProcessor> sinkProc = root->findProcessorByName("TestProcessor");
gsl_Assert(sinkProc);
// prevent execution of the consumer processor
sinkProc->yield(10s);
std::map<std::string, minifi::Connection*> connectionMap;
root->getConnections(connectionMap);
// adds the single connection to the map both by name and id
REQUIRE(connectionMap.size() == 2);
testController.startFlow();
// wait for the generator to create some files
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
for (auto& it : connectionMap) {
REQUIRE(it.second->getQueueSize() > 10);
}
controller->stop();
REQUIRE(sinkProc.get().trigger_count == 0);
for (auto& it : connectionMap) {
REQUIRE(it.second->isEmpty());
}
}
TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "10 s");
TypedProcessorWrapper<minifi::processors::TestFlowFileGenerator> sourceProc = root->findProcessorByName("Generator");
TypedProcessorWrapper<minifi::processors::TestProcessor> sinkProc = root->findProcessorByName("TestProcessor");
std::promise<void> execSinkPromise;
std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc.get().onTriggerCb_ = [&] {
execSinkFuture.wait();
};
testController.startFlow();
// wait for the source processor to enqueue its flowFiles
auto flowFilesEnqueued = [&] { return root->getTotalFlowFileCount() >= 3; };
REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{500}, flowFilesEnqueued));
REQUIRE(sourceProc.get().trigger_count.load() >= 1);
execSinkPromise.set_value();
controller->stop();
REQUIRE(sourceProc.get().trigger_count.load() >= 1);
REQUIRE(sinkProc.get().trigger_count.load() >= 3);
}
TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
TypedProcessorWrapper<minifi::processors::TestFlowFileGenerator> sourceProc = root->findProcessorByName("Generator");
TypedProcessorWrapper<minifi::processors::TestProcessor> sinkProc = root->findProcessorByName("TestProcessor");
std::promise<void> execSinkPromise;
std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc.get().onTriggerCb_ = [&]{
execSinkFuture.wait();
static std::atomic<bool> first_onTrigger{true};
bool isFirst = true;
// sleep only on the first trigger
if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
std::this_thread::sleep_for(std::chrono::milliseconds{1500});
}
};
testController.startFlow();
// wait for the source processor to enqueue its flowFiles
auto flowFilesEnqueued = [&] { return root->getTotalFlowFileCount() >= 3; };
REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{500}, flowFilesEnqueued));
REQUIRE(sourceProc.get().trigger_count.load() >= 1);
execSinkPromise.set_value();
controller->stop();
REQUIRE(sourceProc.get().trigger_count.load() >= 1);
REQUIRE(sinkProc.get().trigger_count.load() >= 1);
}
TEST_CASE("Extend the waiting period during shutdown", "[TestFlow4]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
std::chrono::milliseconds timeout_ms = 1000ms;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, fmt::format("{}", timeout_ms));
TypedProcessorWrapper<minifi::processors::TestFlowFileGenerator> sourceProc = root->findProcessorByName("Generator");
TypedProcessorWrapper<minifi::processors::TestProcessor> sinkProc = root->findProcessorByName("TestProcessor");
std::promise<void> execSinkPromise;
std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc.get().onTriggerCb_ = [&]{
execSinkFuture.wait();
static std::atomic<bool> first_onTrigger{true};
bool isFirst = true;
// sleep only on the first trigger
if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
std::this_thread::sleep_for(std::chrono::milliseconds{1500});
}
};
testController.startFlow();
// wait for the source processor to enqueue its flowFiles
auto flowFilesEnqueued = [&] { return root->getTotalFlowFileCount() >= 3; };
REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{500}, flowFilesEnqueued));
REQUIRE(sourceProc.get().trigger_count.load() >= 1);
std::thread shutdownThread([&]{
execSinkPromise.set_value();
controller->stop();
});
auto shutdownInitiated = std::chrono::steady_clock::now();
auto shutdownDuration = [&] {return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - shutdownInitiated);};
std::this_thread::sleep_for(std::chrono::milliseconds{500});
while (shutdownDuration() < std::chrono::milliseconds(5000) && controller->isRunning()) {
timeout_ms += 500ms;
testController.getLogger()->log_info("Controller still running after {}, extending the waiting period to {}, ff count: {}",
shutdownDuration(), timeout_ms, static_cast<unsigned int>(root->getTotalFlowFileCount()));
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, fmt::format("{}", timeout_ms));
std::this_thread::sleep_for(std::chrono::milliseconds{500});
}
REQUIRE(!controller->isRunning());
REQUIRE(shutdownDuration().count() > 1500);
shutdownThread.join();
REQUIRE(sourceProc.get().trigger_count.load() >= 1);
REQUIRE(sinkProc.get().trigger_count.load() >= 3);
}
TEST_CASE("FlowController destructor releases resources", "[TestFlow5]") {
TestControllerWithFlow controller(R"(
Flow Controller:
name: Banana Bread
Processors:
- name: GenFF
id: 00000000-0000-0000-0000-000000000001
class: GenerateFlowFile
scheduling strategy: TIMER_DRIVEN
scheduling period: 10 sec
Connections: []
Remote Processing Groups: []
Controller Services: []
)");
controller.startFlow();
REQUIRE(LogTestController::getInstance().countOccurrences("Creating scheduling agent") == 3);
LogTestController::getInstance().clear();
bool update_successful = controller.controller_->applyConfiguration("/flows/1", empty_flow).has_value();
REQUIRE(update_successful);
REQUIRE(LogTestController::getInstance().countOccurrences("Creating scheduling agent") == 3);
REQUIRE(LogTestController::getInstance().countOccurrences("Destroying scheduling agent") == 3);
LogTestController::getInstance().clear();
// manually destroy the controller
controller.controller_.reset();
REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(0s, "Destroying FlowController"));
REQUIRE(LogTestController::getInstance().countOccurrences("Destroying scheduling agent") == 3);
}