blob: e3d4b60ab06c44138c8304e5cf2c03f14fd3c64e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: Tue Jul 10 17:40:58 CST 2012
#include <sys/types.h>
#include <stddef.h> // size_t
#include <gflags/gflags.h>
#include "butil/compat.h" // OS_MACOSX
#include "butil/macros.h" // ARRAY_SIZE
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "butil/fast_rand.h"
#include "butil/unique_ptr.h"
#include "butil/third_party/murmurhash3/murmurhash3.h" // fmix64
#include "bthread/errno.h" // ESTOP
#include "bthread/butex.h" // butex_*
#include "bthread/sys_futex.h" // futex_wake_private
#include "bthread/processor.h" // cpu_relax
#include "bthread/task_control.h"
#include "bthread/task_group.h"
#include "bthread/timer_thread.h"
#include "bthread/errno.h"
namespace bthread {
static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_INVALID };
static bool pass_bool(const char*, bool) { return true; }
DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time "
"from bthread creation to first run will be recorded and shown "
"in /vars");
const bool ALLOW_UNUSED dummy_show_bthread_creation_in_vars =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_show_bthread_creation_in_vars,
pass_bool);
DEFINE_bool(show_per_worker_usage_in_vars, false,
"Show per-worker usage in /vars/bthread_per_worker_usage_<tid>");
const bool ALLOW_UNUSED dummy_show_per_worker_usage_in_vars =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_show_per_worker_usage_in_vars,
pass_bool);
BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group, NULL);
// Sync with TaskMeta::local_storage when a bthread is created or destroyed.
// During running, the two fields may be inconsistent, use tls_bls as the
// groundtruth.
__thread LocalStorage tls_bls = BTHREAD_LOCAL_STORAGE_INITIALIZER;
// defined in bthread/key.cpp
extern void return_keytable(bthread_keytable_pool_t*, KeyTable*);
// [Hacky] This is a special TLS set by bthread-rpc privately... to save
// overhead of creation keytable, may be removed later.
BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL);
const TaskStatistics EMPTY_STAT = { 0, 0 };
const size_t OFFSET_TABLE[] = {
#include "bthread/offset_inl.list"
};
void* (*g_create_span_func)() = NULL;
void* run_create_span_func() {
if (g_create_span_func) {
return g_create_span_func();
}
return tls_bls.rpcz_parent_span;
}
int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
TaskMeta* const m = address_meta(tid);
if (m != NULL) {
const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
*out = m->attr;
return 0;
}
}
errno = EINVAL;
return -1;
}
void TaskGroup::set_stopped(bthread_t tid) {
TaskMeta* const m = address_meta(tid);
if (m != NULL) {
const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
m->stop = true;
}
}
}
bool TaskGroup::is_stopped(bthread_t tid) {
TaskMeta* const m = address_meta(tid);
if (m != NULL) {
const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
return m->stop;
}
}
// If the tid does not exist or version does not match, it's intuitive
// to treat the thread as "stopped".
return true;
}
bool TaskGroup::wait_task(bthread_t* tid) {
do {
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
if (_last_pl_state.stopped()) {
return false;
}
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
#else
const ParkingLot::State st = _pl->get_state();
if (st.stopped()) {
return false;
}
if (steal_task(tid)) {
return true;
}
_pl->wait(st);
#endif
} while (true);
}
static double get_cumulated_cputime_from_this(void* arg) {
return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
}
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
char name[32];
#if defined(OS_MACOSX)
snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
pthread_numeric_id());
#else
snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
(long)syscall(SYS_gettid));
#endif
usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
(name, &cumulated_cputime, 1));
}
}
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}
TaskGroup::TaskGroup(TaskControl* c)
:
_cur_meta(NULL)
, _control(c)
, _num_nosignal(0)
, _nsignaled(0)
, _last_run_ns(butil::cpuwide_time_ns())
, _cumulated_cputime_ns(0)
, _nswitch(0)
, _last_context_remained(NULL)
, _last_context_remained_arg(NULL)
, _pl(NULL)
, _main_stack(NULL)
, _main_tid(0)
, _remote_num_nosignal(0)
, _remote_nsignaled(0)
#ifndef NDEBUG
, _sched_recursive_guard(0)
#endif
, _tag(BTHREAD_TAG_DEFAULT)
{
_steal_seed = butil::fast_rand();
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
CHECK(c);
}
TaskGroup::~TaskGroup() {
if (_main_tid) {
TaskMeta* m = address_meta(_main_tid);
CHECK(_main_stack == m->stack);
return_stack(m->release_stack());
return_resource(get_slot(_main_tid));
_main_tid = 0;
}
}
int TaskGroup::init(size_t runqueue_capacity) {
if (_rq.init(runqueue_capacity) != 0) {
LOG(FATAL) << "Fail to init _rq";
return -1;
}
if (_remote_rq.init(runqueue_capacity / 2) != 0) {
LOG(FATAL) << "Fail to init _remote_rq";
return -1;
}
ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
if (NULL == stk) {
LOG(FATAL) << "Fail to get main stack container";
return -1;
}
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
if (NULL == m) {
LOG(FATAL) << "Fail to get TaskMeta";
return -1;
}
m->sleep_failed = false;
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = NULL;
m->arg = NULL;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = butil::cpuwide_time_ns();
m->stat = EMPTY_STAT;
m->attr = BTHREAD_ATTR_TASKGROUP;
m->tid = make_tid(*m->version_butex, slot);
m->set_stack(stk);
_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
_last_run_ns = butil::cpuwide_time_ns();
return 0;
}
void TaskGroup::task_runner(intptr_t skip_remained) {
// NOTE: tls_task_group is volatile since tasks are moved around
// different groups.
TaskGroup* g = tls_task_group;
if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
#ifndef NDEBUG
--g->_sched_recursive_guard;
#endif
}
do {
// A task can be stopped before it gets running, in which case
// we may skip user function, but that may confuse user:
// Most tasks have variables to remember running result of the task,
// which is often initialized to values indicating success. If an
// user function is never called, the variables will be unchanged
// however they'd better reflect failures because the task is stopped
// abnormally.
// Meta and identifier of the task is persistent in this run.
TaskMeta* const m = g->_cur_meta;
if (FLAGS_show_bthread_creation_in_vars) {
// NOTE: the thread triggering exposure of pending time may spend
// considerable time because a single bvar::LatencyRecorder
// contains many bvar.
g->_control->exposed_pending_time() <<
(butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
}
// Not catch exceptions except ExitException which is for implementing
// bthread_exit(). User code is intended to crash when an exception is
// not caught explicitly. This is consistent with other threading
// libraries.
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
// TODO: Save thread_return
(void)thread_return;
// Logging must be done before returning the keytable, since the logging lib
// use bthread local storage internally, or will cause memory leak.
// FIXME: the time from quiting fn to here is not counted into cputime
if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Finished bthread " << m->tid << ", cputime="
<< m->stat.cputime_ns / 1000000.0 << "ms";
}
// Clean tls variables, must be done before changing version_butex
// otherwise another thread just joined this thread may not see side
// effects of destructing tls variables.
KeyTable* kt = tls_bls.keytable;
if (kt != NULL) {
return_keytable(m->attr.keytable_pool, kt);
// After deletion: tls may be set during deletion.
tls_bls.keytable = NULL;
m->local_storage.keytable = NULL; // optional
}
// During running the function in TaskMeta and deleting the KeyTable in
// return_KeyTable, the group is probably changed.
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
// Increase the version and wake up all joiners, if resulting version
// is 0, change it to 1 to make bthread_t never be 0. Any access
// or join to the bthread after changing version will be rejected.
// The spinlock is for visibility of TaskGroup::get_attr.
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (0 == ++*m->version_butex) {
++*m->version_butex;
}
}
butex_wake_except(m->version_butex, 0);
g->_control->_nbthreads << -1;
g->_control->tag_nbthreads(g->tag()) << -1;
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
// Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
// tasks to run, quit for more tasks.
}
void TaskGroup::_release_last_context(void* arg) {
TaskMeta* m = static_cast<TaskMeta*>(arg);
if (m->stack_type() != STACK_TYPE_PTHREAD) {
return_stack(m->release_stack()/*may be NULL*/);
} else {
// it's _main_stack, don't return.
m->set_stack(NULL);
}
return_resource(get_slot(m->tid));
}
int TaskGroup::start_foreground(TaskGroup** pg,
bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return EINVAL;
}
const int64_t start_ns = butil::cpuwide_time_ns();
const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return ENOMEM;
}
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->sleep_failed = false;
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
CHECK(m->stack == NULL);
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
m->local_storage.rpcz_parent_span = run_create_span_func();
}
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
TaskGroup* g = *pg;
g->_control->_nbthreads << 1;
g->_control->tag_nbthreads(g->tag()) << 1;
if (g->is_current_pthread_task()) {
// never create foreground task in pthread.
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
return 0;
}
template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return EINVAL;
}
const int64_t start_ns = butil::cpuwide_time_ns();
const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return ENOMEM;
}
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->sleep_failed = false;
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
CHECK(m->stack == NULL);
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
m->local_storage.rpcz_parent_span = run_create_span_func();
}
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
_control->_nbthreads << 1;
_control->tag_nbthreads(tag()) << 1;
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
// Explicit instantiations.
template int
TaskGroup::start_background<true>(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg);
template int
TaskGroup::start_background<false>(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg);
int TaskGroup::join(bthread_t tid, void** return_value) {
if (__builtin_expect(!tid, 0)) { // tid of bthread is never 0.
return EINVAL;
}
TaskMeta* m = address_meta(tid);
if (__builtin_expect(!m, 0)) {
// The bthread is not created yet, this join is definitely wrong.
return EINVAL;
}
TaskGroup* g = tls_task_group;
if (g != NULL && g->current_tid() == tid) {
// joining self causes indefinite waiting.
return EINVAL;
}
const uint32_t expected_version = get_version(tid);
while (*m->version_butex == expected_version) {
if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
if (return_value) {
*return_value = NULL;
}
return 0;
}
bool TaskGroup::exists(bthread_t tid) {
if (tid != 0) { // tid of bthread is never 0.
TaskMeta* m = address_meta(tid);
if (m != NULL) {
return (*m->version_butex == get_version(tid));
}
}
return false;
}
TaskStatistics TaskGroup::main_stat() const {
TaskMeta* m = address_meta(_main_tid);
return m ? m->stat : EMPTY_STAT;
}
void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
// to 2.9%
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
TaskMeta* const cur_meta = g->_cur_meta;
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
if (next_meta->stack_type() == cur_meta->stack_type()) {
// also works with pthread_task scheduling to pthread_task, the
// transfered stack is just _main_stack.
next_meta->set_stack(cur_meta->release_stack());
} else {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack(g->_main_stack);
}
}
}
sched_to(pg, next_meta);
}
void TaskGroup::sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
#ifndef BTHREAD_FAIR_WSQ
const bool popped = g->_rq.pop(&next_tid);
#else
const bool popped = g->_rq.steal(&next_tid);
#endif
if (!popped && !g->steal_task(&next_tid)) {
// Jump to main task if there's no task to run.
next_tid = g->_main_tid;
}
sched_to(pg, next_tid);
}
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
#ifndef NDEBUG
if ((++g->_sched_recursive_guard) > 1) {
LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
<< ") call sched_to(" << g << ")";
}
#endif
// Save errno so that errno is bthread-specific.
const int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;
TaskMeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
cur_meta->stat.cputime_ns += elp_ns;
if (cur_meta->tid != g->main_tid()) {
g->_cumulated_cputime_ns += elp_ns;
}
++cur_meta->stat.nswitch;
++ g->_nswitch;
// Switch to the task
if (__builtin_expect(next_meta != cur_meta, 1)) {
g->_cur_meta = next_meta;
// Switch tls_bls
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;
// Logging must be done after switching the local storage, since the logging lib
// use bthread local storage internally, or will cause memory leak.
if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
(next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
<< next_meta->tid;
}
if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack);
// probably went to another group, need to assign g again.
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
#ifndef NDEBUG
else {
// else pthread_task is switching to another pthread_task, sc
// can only equal when they're both _main_stack
CHECK(cur_meta->stack == g->_main_stack);
}
#endif
}
// else because of ending_sched(including pthread_task->pthread_task)
} else {
LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
}
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
// Restore errno
errno = saved_errno;
// tls_unique_user_ptr probably changed.
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_unique_user_ptr, saved_unique_user_ptr);
#ifndef NDEBUG
--g->_sched_recursive_guard;
#endif
*pg = g;
}
void TaskGroup::destroy_self() {
if (_control) {
_control->_destroy_group(this);
_control = NULL;
} else {
CHECK(false);
}
}
void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) {
++_num_nosignal;
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
_control->signal_task(1 + additional_signal, _tag);
}
}
void TaskGroup::flush_nosignal_tasks() {
const int val = _num_nosignal;
if (val) {
_num_nosignal = 0;
_nsignaled += val;
_control->signal_task(val, _tag);
}
}
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
_remote_rq._mutex.lock();
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock();
_control->signal_task(1 + additional_signal, _tag);
}
}
void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
const int val = _remote_num_nosignal;
if (!val) {
locked_mutex.unlock();
return;
}
_remote_num_nosignal = 0;
_remote_nsignaled += val;
locked_mutex.unlock();
_control->signal_task(val, _tag);
}
void TaskGroup::ready_to_run_general(bthread_t tid, bool nosignal) {
if (tls_task_group == this) {
return ready_to_run(tid, nosignal);
}
return ready_to_run_remote(tid, nosignal);
}
void TaskGroup::flush_nosignal_tasks_general() {
if (tls_task_group == this) {
return flush_nosignal_tasks();
}
return flush_nosignal_tasks_remote();
}
void TaskGroup::ready_to_run_in_worker(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->ready_to_run(args->tid, args->nosignal);
}
void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->push_rq(args->tid);
}
struct SleepArgs {
uint64_t timeout_us;
bthread_t tid;
TaskMeta* meta;
TaskGroup* group;
};
static void ready_to_run_from_timer_thread(void* arg) {
CHECK(tls_task_group == NULL);
const SleepArgs* e = static_cast<const SleepArgs*>(arg);
auto g = e->group;
auto tag = g->tag();
g->control()->choose_one_group(tag)->ready_to_run_remote(e->tid);
}
void TaskGroup::_add_sleep_event(void* void_args) {
// Must copy SleepArgs. After calling TimerThread::schedule(), previous
// thread may be stolen by a worker immediately and the on-stack SleepArgs
// will be gone.
SleepArgs e = *static_cast<SleepArgs*>(void_args);
TaskGroup* g = e.group;
TimerThread::TaskId sleep_id;
sleep_id = get_global_timer_thread()->schedule(
ready_to_run_from_timer_thread, void_args,
butil::microseconds_from_now(e.timeout_us));
if (!sleep_id) {
e.meta->sleep_failed = true;
// Fail to schedule timer, go back to previous thread.
g->ready_to_run(e.tid);
return;
}
// Set TaskMeta::current_sleep which is for interruption.
const uint32_t given_ver = get_version(e.tid);
{
BAIDU_SCOPED_LOCK(e.meta->version_lock);
if (given_ver == *e.meta->version_butex && !e.meta->interrupted) {
e.meta->current_sleep = sleep_id;
return;
}
}
// The thread is stopped or interrupted.
// interrupt() always sees that current_sleep == 0. It will not schedule
// the calling thread. The race is between current thread and timer thread.
if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
// added to timer, previous thread may be already woken up by timer and
// even stopped. It's safe to schedule previous thread when unschedule()
// returns 0 which means "the not-run-yet sleep_id is removed". If the
// sleep_id is running(returns 1), ready_to_run_in_worker() will
// schedule previous thread as well. If sleep_id does not exist,
// previous thread is scheduled by timer thread before and we don't
// have to do it again.
g->ready_to_run(e.tid);
}
}
// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
if (0 == timeout_us) {
yield(pg);
return 0;
}
TaskGroup* g = *pg;
// We have to schedule timer after we switched to next bthread otherwise
// the timer may wake up(jump to) current still-running context.
SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
g->set_remained(_add_sleep_event, &e);
sched(pg);
g = *pg;
if (e.meta->sleep_failed) {
// Fail to schedule timer, return error.
e.meta->sleep_failed = false;
errno = ESTOP;
return -1;
}
e.meta->current_sleep = 0;
if (e.meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
e.meta->interrupted = false;
// NOTE: setting errno to ESTOP is not necessary from bthread's
// pespective, however many RPC code expects bthread_usleep to set
// errno to ESTOP when the thread is stopping, and print FATAL
// otherwise. To make smooth transitions, ESTOP is still set instead
// of EINTR when the thread is stopping.
errno = (e.meta->stop ? ESTOP : EINTR);
return -1;
}
return 0;
}
// Defined in butex.cpp
bool erase_from_butex_because_of_interruption(ButexWaiter* bw);
static int interrupt_and_consume_waiters(
bthread_t tid, ButexWaiter** pw, uint64_t* sleep_id) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m == NULL) {
return EINVAL;
}
const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
*pw = m->current_waiter.exchange(NULL, butil::memory_order_acquire);
*sleep_id = m->current_sleep;
m->current_sleep = 0; // only one stopper gets the sleep_id
m->interrupted = true;
return 0;
}
return EINVAL;
}
static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m != NULL) {
const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
// Release fence makes m->interrupted visible to butex_wait
m->current_waiter.store(w, butil::memory_order_release);
return 0;
}
}
return EINVAL;
}
// The interruption is "persistent" compared to the ones caused by signals,
// namely if a bthread is interrupted when it's not blocked, the interruption
// is still remembered and will be checked at next blocking. This designing
// choice simplifies the implementation and reduces notification loss caused
// by race conditions.
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
// can't be interrupted.
int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
// Consume current_waiter in the TaskMeta, wake it up then set it back.
ButexWaiter* w = NULL;
uint64_t sleep_id = 0;
int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id);
if (rc) {
return rc;
}
// a bthread cannot wait on a butex and be sleepy at the same time.
CHECK(!sleep_id || !w);
if (w != NULL) {
erase_from_butex_because_of_interruption(w);
// If butex_wait() already wakes up before we set current_waiter back,
// the function will spin until current_waiter becomes non-NULL.
rc = set_butex_waiter(tid, w);
if (rc) {
LOG(FATAL) << "butex_wait should spin until setting back waiter";
return rc;
}
} else if (sleep_id != 0) {
if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
g->ready_to_run(tid);
} else {
if (!c) {
return EINVAL;
}
c->choose_one_group(tag)->ready_to_run_remote(tid);
}
}
}
return 0;
}
void TaskGroup::yield(TaskGroup** pg) {
TaskGroup* g = *pg;
ReadyToRunArgs args = { g->current_tid(), false };
g->set_remained(ready_to_run_in_worker, &args);
sched(pg);
}
void print_task(std::ostream& os, bthread_t tid) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m == NULL) {
os << "bthread=" << tid << " : never existed";
return;
}
const uint32_t given_ver = get_version(tid);
bool matched = false;
bool stop = false;
bool interrupted = false;
bool about_to_quit = false;
void* (*fn)(void*) = NULL;
void* arg = NULL;
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bool has_tls = false;
int64_t cpuwide_start_ns = 0;
TaskStatistics stat = {0, 0};
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
matched = true;
stop = m->stop;
interrupted = m->interrupted;
about_to_quit = m->about_to_quit;
fn = m->fn;
arg = m->arg;
attr = m->attr;
has_tls = m->local_storage.keytable;
cpuwide_start_ns = m->cpuwide_start_ns;
stat = m->stat;
}
}
if (!matched) {
os << "bthread=" << tid << " : not exist now";
} else {
os << "bthread=" << tid << " :\nstop=" << stop
<< "\ninterrupted=" << interrupted
<< "\nabout_to_quit=" << about_to_quit
<< "\nfn=" << (void*)fn
<< "\narg=" << (void*)arg
<< "\nattr={stack_type=" << attr.stack_type
<< " flags=" << attr.flags
<< " keytable_pool=" << attr.keytable_pool
<< "}\nhas_tls=" << has_tls
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
<< "\ncputime_ns=" << stat.cputime_ns
<< "\nnswitch=" << stat.nswitch;
}
}
} // namespace bthread