blob: cfb60518804f33bbf75163cef552ddff9416d9f2 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <stddef.h>
#include <algorithm>
#include <chrono>
#include <random>
#include <thread>
#include <type_traits>
#include <utility>
#include "runtime/node_scoper.h"
#include "runtime/service_app.h"
#include "runtime/simulator.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_queue.h"
#include "runtime/task/task_spec.h"
#include "scheduler.h"
#include "utils/fmt_logging.h"
#include "utils/join_point.h"
#include "utils/rand.h"
namespace dsn {
namespace tools {
void event_wheel::add_event(uint64_t ts, task *t)
{
utils::auto_lock<::dsn::utils::ex_lock> l(_lock);
std::vector<event_entry> *evts;
auto itr = _events.find(ts);
if (itr != _events.end())
evts = itr->second;
else {
evts = new std::vector<event_entry>();
_events.insert(std::make_pair(ts, evts));
}
event_entry entry;
entry.app_task = t;
evts->push_back(entry);
}
void event_wheel::add_system_event(uint64_t ts, std::function<void()> t)
{
utils::auto_lock<::dsn::utils::ex_lock> l(_lock);
std::vector<event_entry> *evts;
auto itr = _events.find(ts);
if (itr != _events.end())
evts = itr->second;
else {
evts = new std::vector<event_entry>();
_events.insert(std::make_pair(ts, evts));
}
event_entry entry;
entry.system_task = std::move(t);
entry.app_task = nullptr;
evts->push_back(entry);
}
std::vector<event_entry> *event_wheel::pop_next_events(/*out*/ uint64_t &ts)
{
utils::auto_lock<::dsn::utils::ex_lock> l(_lock);
std::vector<event_entry> *evts = NULL;
auto itr = _events.begin();
if (itr != _events.end()) {
evts = itr->second;
ts = itr->first;
_events.erase(itr);
}
return evts;
}
void event_wheel::clear()
{
utils::auto_lock<::dsn::utils::ex_lock> l(_lock);
_events.clear();
}
//////////////////////////////////////////////////////////////////////////////////////////////
__thread bool scheduler::_is_scheduling = false;
scheduler::scheduler(void)
{
_time_ns = 0;
_running = false;
_running_thread = nullptr;
task_worker::on_create.put_back(on_task_worker_create, "simulation.on_task_worker_create");
task_worker::on_start.put_back(on_task_worker_start, "simulation.on_task_worker_start");
for (int i = 0; i <= dsn::task_code::max(); i++) {
task_spec::get(i)->on_task_wait_pre.put_back(scheduler::on_task_wait,
"simulation.on_task_wait");
task_spec::get(i)->on_task_wait_notified.put_back(scheduler::on_task_wait_notified,
"simulation.on_task_wait_notified");
}
task_ext::register_ext(task_state_ext::deletor);
task_worker_ext::register_ext(sim_worker_state::deletor);
}
scheduler::~scheduler(void) {}
/*static*/ void scheduler::on_task_worker_start(task_worker *worker)
{
while (!scheduler::instance()._running) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
/*static*/ void scheduler::on_task_worker_create(task_worker *worker)
{
auto s = task_worker_ext::get_inited(worker);
s->worker = worker;
s->first_time_schedule = true;
s->in_continuation = false;
s->index = static_cast<int>(scheduler::instance()._threads.size());
scheduler::instance()._threads.push_back(s);
}
/*static*/ void scheduler::on_task_wait(task *waitor, task *waitee, uint32_t timeout_milliseconds)
{
if (waitor == nullptr)
return;
if (waitee->state() < task_state::TASK_STATE_FINISHED) {
auto ts = task_ext::get_inited(waitee);
auto wks = task_worker_ext::get(task::get_current_worker());
ts->wait_threads.push_back(wks);
scheduler::instance().wait_schedule(true, false);
} else {
scheduler::instance().wait_schedule(true, true);
}
}
/*static*/ void scheduler::on_task_wait_notified(task *task)
{
auto ts = task_ext::get(task);
if (ts != nullptr) {
for (auto &w : ts->wait_threads) {
w->is_continuation_ready = true;
}
}
}
void scheduler::add_task(task *tsk, task_queue *q)
{
auto ts = task_ext::get_inited(tsk);
ts->queue = q;
auto delay = (uint64_t)tsk->delay_milliseconds() * 1000000;
tsk->set_delay(0);
_wheel.add_event(now_ns() + delay, tsk);
}
void scheduler::add_system_event(uint64_t ts_ns, std::function<void()> t)
{
_wheel.add_system_event(ts_ns, t);
}
void scheduler::start()
{
// init all checkers
std::vector<service_app *> apps;
service_app::get_all_service_apps(&apps);
for (checker_info &c : _checkers) {
checker *a_checker = c.creator();
a_checker->initialize(c.name, apps);
c.instance.reset(a_checker);
}
// set flag
_running = true;
}
void scheduler::add_checker(const std::string &name, checker::factory f)
{
checker_info info;
info.name = name;
info.creator = f;
_checkers.emplace_back(std::move(info));
}
void scheduler::check()
{
for (checker_info &c : _checkers) {
if (c.instance != nullptr)
c.instance->check();
}
}
void scheduler::wait_schedule(bool in_continue, bool is_continue_ready /*= false*/)
{
auto s = task_worker_ext::get(task::get_current_worker());
s->in_continuation = in_continue;
s->is_continuation_ready = is_continue_ready;
if (s->first_time_schedule) {
s->first_time_schedule = false;
if (s->index == 0)
schedule();
} else {
schedule();
}
s->runnable.wait();
}
void scheduler::schedule()
{
_is_scheduling = true;
check(); // check before schedule
while (true) {
// run ready workers whenever possible
std::vector<int> ready_workers;
for (auto &s : _threads) {
if ((s->in_continuation && s->is_continuation_ready) ||
(!s->in_continuation && s->worker->queue()->count() > 0)) {
ready_workers.push_back(s->index);
}
}
if (ready_workers.size() > 0) {
int i = rand::next_u32(0, (uint32_t)ready_workers.size() - 1);
_running_thread = _threads[ready_workers[i]];
_running_thread->runnable.release();
_is_scheduling = false;
return;
}
// otherwise, run the timed tasks
uint64_t ts = 0;
auto events = _wheel.pop_next_events(ts);
if (events) {
{
utils::auto_lock<::dsn::utils::ex_lock> l(_lock);
_time_ns = ts;
}
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(events->begin(), events->end(), g);
for (auto e : *events) {
if (e.app_task != nullptr) {
task *t = e.app_task;
{
node_scoper ns(t->node());
t->enqueue();
}
t->release_ref(); // added by previous t->enqueue from app
} else {
CHECK(e.system_task, "app and system tasks cannot be both empty");
e.system_task();
}
}
delete events;
continue;
}
// wait a moment
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
_is_scheduling = false;
}
}
} // end namespace