blob: fa46b7bcc5e20eda40f7fc03e9b07d29444e10c2 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#undef NDEBUG
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
#include "core/RepositoryFactory.h"
#include "FlowFileRecord.h"
#include "provenance/Provenance.h"
#include "properties/Configure.h"
#include "../unit/ProvenanceTestHelper.h"
#include "../TestBase.h"
#include "YamlConfiguration.h"
#include "CustomProcessors.h"
#include "TestControllerWithFlow.h"
const char* yamlConfig =
R"(
Flow Controller:
name: MiNiFi Flow
id: 2438e3c8-015a-1000-79ca-83af40ec1990
Processors:
- name: Generator
id: 2438e3c8-015a-1000-79ca-83af40ec1991
class: org.apache.nifi.processors.standard.TestFlowFileGenerator
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 100 ms
penalization period: 300 ms
yield period: 100 ms
run duration nanos: 0
auto-terminated relationships list:
Properties:
Batch Size: 3
- name: TestProcessor
id: 2438e3c8-015a-1000-79ca-83af40ec1992
class: org.apache.nifi.processors.standard.TestProcessor
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 100 ms
penalization period: 3 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list:
- apple
- banana
Connections:
- name: Gen
id: 2438e3c8-015a-1000-79ca-83af40ec1997
source name: Generator
source id: 2438e3c8-015a-1000-79ca-83af40ec1991
source relationship name: success
destination name: TestProcessor
destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
max work queue size: 0
max work queue data size: 1 MB
flowfile expiration: 60 sec
Remote Processing Groups:
)";
template<typename Fn>
bool verifyWithBusyWait(std::chrono::milliseconds timeout, Fn&& fn) {
auto start = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start < timeout) {
if (fn()) {
return true;
}
}
return false;
}
TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "100 ms");
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));
// prevent execution of the consumer processor
sinkProc->yield(10000);
std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
root->getConnections(connectionMap);
// adds the single connection to the map both by name and id
REQUIRE(connectionMap.size() == 2);
testController.startFlow();
// wait for the generator to create some files
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
for (auto& it : connectionMap) {
REQUIRE(it.second->getQueueSize() > 10);
}
controller->stop();
REQUIRE(sinkProc->trigger_count == 0);
for (auto& it : connectionMap) {
REQUIRE(it.second->isEmpty());
}
}
TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "10 s");
auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessorByName("Generator"));
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));
std::promise<void> execSinkPromise;
std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc->onTriggerCb_ = [&] {
execSinkFuture.wait();
};
testController.startFlow();
// wait for the source processor to enqueue its flowFiles
auto flowFilesEnqueued = [&] {return root->getTotalFlowFileCount() == 3;};
REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{50}, flowFilesEnqueued));
REQUIRE(sourceProc->trigger_count.load() == 1);
execSinkPromise.set_value();
controller->stop();
REQUIRE(sourceProc->trigger_count.load() == 1);
REQUIRE(sinkProc->trigger_count.load() == 3);
}
TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");
auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessorByName("Generator"));
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));
std::promise<void> execSinkPromise;
std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc->onTriggerCb_ = [&]{
execSinkFuture.wait();
static std::atomic<bool> first_onTrigger{true};
bool isFirst = true;
// sleep only on the first trigger
if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
std::this_thread::sleep_for(std::chrono::milliseconds{1500});
}
};
testController.startFlow();
// wait for the source processor to enqueue its flowFiles
auto flowFilesEnqueued = [&] {return root->getTotalFlowFileCount() == 3;};
REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{50}, flowFilesEnqueued));
REQUIRE(sourceProc->trigger_count.load() == 1);
execSinkPromise.set_value();
controller->stop();
REQUIRE(sourceProc->trigger_count.load() == 1);
REQUIRE(sinkProc->trigger_count.load() == 1);
}
TEST_CASE("Extend the waiting period during shutdown", "[TestFlow4]") {
TestControllerWithFlow testController(yamlConfig);
auto controller = testController.controller_;
auto root = testController.root_;
unsigned int timeout_ms = 1000;
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, std::to_string(timeout_ms) + " ms");
auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessorByName("Generator"));
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));
std::promise<void> execSinkPromise;
std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc->onTriggerCb_ = [&]{
execSinkFuture.wait();
static std::atomic<bool> first_onTrigger{true};
bool isFirst = true;
// sleep only on the first trigger
if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
std::this_thread::sleep_for(std::chrono::milliseconds{1500});
}
};
testController.startFlow();
// wait for the source processor to enqueue its flowFiles
auto flowFilesEnqueued = [&] {return root->getTotalFlowFileCount() == 3;};
REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{50}, flowFilesEnqueued));
REQUIRE(sourceProc->trigger_count.load() == 1);
std::thread shutdownThread([&]{
execSinkPromise.set_value();
controller->stop();
});
auto shutdownInitiated = std::chrono::steady_clock::now();
auto shutdownDuration = [&] {return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - shutdownInitiated);};
std::this_thread::sleep_for(std::chrono::milliseconds{500});
while (shutdownDuration() < std::chrono::milliseconds(2500) && controller->isRunning()) {
timeout_ms += 500;
testController.getLogger()->log_info("Controller still running after %u ms, extending the waiting period to %u ms, ff count: %u",
static_cast<unsigned int>(shutdownDuration().count()), timeout_ms, static_cast<unsigned int>(root->getTotalFlowFileCount()));
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, std::to_string(timeout_ms) + " ms");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
}
REQUIRE(!controller->isRunning());
REQUIRE(shutdownDuration().count() > 1500);
shutdownThread.join();
REQUIRE(sourceProc->trigger_count.load() == 1);
REQUIRE(sinkProc->trigger_count.load() == 3);
}