blob: 76424a2501efdeffab953edcf9707c1327d569bc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include "unit/Catch.h"
#include "unit/TestBase.h"
#include "unit/ProvenanceTestHelper.h"
#include "unit/TestUtils.h"
#include "utils/TimeUtil.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::testing {
using minifi::core::controller::StandardControllerServiceProvider;
class CountOnTriggersProcessor : public minifi::core::ProcessorImpl {
public:
using minifi::core::ProcessorImpl::ProcessorImpl;
static constexpr bool SupportsDynamicProperties = false;
static constexpr bool SupportsDynamicRelationships = false;
static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void onTrigger(core::ProcessContext& context, core::ProcessSession&) override {
if (on_trigger_duration_ > 0ms)
std::this_thread::sleep_for(on_trigger_duration_);
++number_of_triggers;
if (should_yield_)
context.yield();
}
size_t getNumberOfTriggers() const { return number_of_triggers; }
void setOnTriggerDuration(std::chrono::steady_clock::duration on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; }
void setShouldYield(bool should_yield) { should_yield_ = should_yield; }
private:
bool should_yield_ = false;
std::chrono::steady_clock::duration on_trigger_duration_ = 0ms;
std::atomic<size_t> number_of_triggers = 0;
};
#ifdef __GNUC__
// array-bounds warnings in GCC produce a lot of false positives: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=56456
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Warray-bounds"
#endif
class SchedulingAgentTestFixture {
public:
SchedulingAgentTestFixture() {
count_proc_->incrementActiveTasks();
count_proc_->setScheduledState(core::RUNNING);
#ifdef WIN32
minifi::utils::timeutils::dateSetInstall(TZ_DATA_DIR);
#endif
}
protected:
std::shared_ptr<core::Repository> test_repo_ = std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::ContentRepository> content_repo_ = std::make_shared<core::repository::VolatileContentRepository>();
TestController test_controller_;
std::shared_ptr<TestPlan> test_plan = test_controller_.createPlan();
std::shared_ptr<minifi::Configure> configuration_ = std::make_shared<minifi::ConfigureImpl>();
std::shared_ptr<StandardControllerServiceProvider> controller_services_provider_ = std::make_shared<StandardControllerServiceProvider>(
std::make_unique<minifi::core::controller::ControllerServiceNodeMap>(), configuration_);
utils::ThreadPool thread_pool_;
std::shared_ptr<core::Processor> count_proc_ = minifi::test::utils::make_processor<CountOnTriggersProcessor>("count_proc");
CountOnTriggersProcessor* count_proc_impl_ = &count_proc_->getImpl<CountOnTriggersProcessor>();
std::shared_ptr<core::ProcessContext> context_ = std::make_shared<core::ProcessContextImpl>(*count_proc_, nullptr, test_repo_, test_repo_, content_repo_);
std::shared_ptr<core::ProcessSessionFactory> factory_ = std::make_shared<core::ProcessSessionFactoryImpl>(context_);
};
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
TEST_CASE_METHOD(SchedulingAgentTestFixture, "TimerDrivenSchedulingAgent") {
count_proc_->setSchedulingPeriod(125ms);
auto timer_driven_agent = std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
timer_driven_agent->start();
auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!first_task_reschedule_info.isFinished());
CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 125ms);
CHECK(count_proc_impl_->getNumberOfTriggers() == 1);
count_proc_impl_->setOnTriggerDuration(50ms);
auto second_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!second_task_reschedule_info.isFinished());
CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 75ms);
CHECK(count_proc_impl_->getNumberOfTriggers() == 2);
count_proc_impl_->setOnTriggerDuration(150ms);
auto third_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!third_task_reschedule_info.isFinished());
CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now());
CHECK(count_proc_impl_->getNumberOfTriggers() == 3);
}
TEST_CASE_METHOD(SchedulingAgentTestFixture, "EventDrivenSchedulingAgent") {
auto event_driven_agent = std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
event_driven_agent->start();
auto first_task_reschedule_info = event_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!first_task_reschedule_info.isFinished());
CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now());
auto count_num_after_one_schedule = count_proc_impl_->getNumberOfTriggers();
CHECK(count_num_after_one_schedule > 100);
auto second_task_reschedule_info = event_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!second_task_reschedule_info.isFinished());
CHECK(second_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now());
auto count_num_after_two_schedule = count_proc_impl_->getNumberOfTriggers();
CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100);
}
TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every year") {
count_proc_->setCronPeriod("0 0 0 1 1 ?");
auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
cron_driven_agent->start();
auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!first_task_reschedule_info.isFinished());
if (first_task_reschedule_info.getNextExecutionTime() > std::chrono::steady_clock::now() + 1min) { // To avoid possibly failing around dec 31 23:59:59
auto wait_time_till_next_execution_time = std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime() - std::chrono::steady_clock::now());
auto current_time = date::make_zoned<std::chrono::seconds>(date::current_zone(), std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
auto current_year_month_day = date::year_month_day(date::floor<date::days>(current_time.get_local_time()));
auto new_years_day = date::make_zoned<std::chrono::seconds>(date::current_zone(), date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1});
auto time_until_new_years_day = new_years_day.get_local_time() - current_time.get_local_time();
CHECK(std::chrono::abs(time_until_new_years_day - wait_time_till_next_execution_time) < 1min);
CHECK(count_proc_impl_->getNumberOfTriggers() == 0);
auto second_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!second_task_reschedule_info.isFinished());
CHECK(std::chrono::abs(first_task_reschedule_info.getNextExecutionTime() - second_task_reschedule_info.getNextExecutionTime()) < 1min);
CHECK(count_proc_impl_->getNumberOfTriggers() == 0);
}
}
TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every sec") {
count_proc_->setCronPeriod("* * * * * *");
auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
cron_driven_agent->start();
auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!first_task_reschedule_info.isFinished());
CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s);
CHECK(count_proc_impl_->getNumberOfTriggers() == 0);
std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime());
auto second_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!second_task_reschedule_info.isFinished());
CHECK(second_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s);
CHECK(count_proc_impl_->getNumberOfTriggers() == 1);
}
TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven no future triggers") {
count_proc_->setCronPeriod("* * * * * * 2012");
auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
cron_driven_agent->start();
auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(first_task_reschedule_info.isFinished());
}
TEST_CASE_METHOD(SchedulingAgentTestFixture, "Timer driven should respect both yield and run schedule") {
SECTION("Fast yield slow schedule") {
count_proc_->setSchedulingPeriod(1min);
count_proc_->setYieldPeriodMsec(10ms);
}
SECTION("Slow yield fast schedule") {
count_proc_->setSchedulingPeriod(10ms);
count_proc_->setYieldPeriodMsec(1min);
}
count_proc_impl_->setShouldYield(true);
auto timer_driven_agent = std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
timer_driven_agent->start();
auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!first_task_reschedule_info.isFinished());
CHECK(first_task_reschedule_info.getNextExecutionTime() > std::chrono::steady_clock::now() + 100ms);
CHECK(count_proc_impl_->getNumberOfTriggers() == 1);
}
} // namespace org::apache::nifi::minifi::testing