blob: 54140c0dc23ad40c402b3441ce4efeba2b828608 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: Tue Jul 10 17:40:58 CST 2012
#ifndef BTHREAD_TASK_GROUP_H
#define BTHREAD_TASK_GROUP_H
#include "butil/time.h" // cpuwide_time_ns
#include "bthread/task_control.h"
#include "bthread/task_meta.h" // bthread_t, TaskMeta
#include "bthread/work_stealing_queue.h" // WorkStealingQueue
#include "bthread/remote_task_queue.h" // RemoteTaskQueue
#include "butil/resource_pool.h" // ResourceId
#include "bthread/parking_lot.h"
#include "bthread/prime_offset.h"
namespace bthread {
// For exiting a bthread.
class ExitException : public std::exception {
public:
explicit ExitException(void* value) : _value(value) {}
~ExitException() throw() {}
const char* what() const throw() override {
return "ExitException";
}
void* value() const {
return _value;
}
private:
void* _value;
};
// Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures
// (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic
// even though Intel and AMD officially doesn’t guarantee this.
// On X86, SSE instructions can ensure atomic loads and stores.
// Starting from Armv8.4-A, neon can ensure atomic loads and stores.
// Otherwise, use mutex to guarantee atomicity.
class AtomicInteger128 {
public:
struct BAIDU_CACHELINE_ALIGNMENT Value {
int64_t v1;
int64_t v2;
};
AtomicInteger128() = default;
explicit AtomicInteger128(Value value) : _value(value) {}
Value load() const;
Value load_unsafe() const {
return _value;
}
void store(Value value);
private:
Value _value{};
// Used to protect `_cpu_time_stat' when __x86_64__, __ARM_NEON, and __riscv is not defined.
FastPthreadMutex _mutex;
};
// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
// function are updated before returning.
class TaskGroup {
public:
// Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
// the identifier into `tid'. Switch to the new task and schedule old task
// to run.
// Return 0 on success, errno otherwise.
static int start_foreground(TaskGroup** pg,
bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg);
// Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
// identifier into `tid'. Schedule the new thread to run.
// Called from worker: start_background<false>
// Called from non-worker: start_background<true>
// Return 0 on success, errno otherwise.
template <bool REMOTE>
int start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg);
// Suspend caller and run next bthread in TaskGroup *pg.
static void sched(TaskGroup** pg);
static void ending_sched(TaskGroup** pg);
// Suspend caller and run bthread `next_tid' in TaskGroup *pg.
// Purpose of this function is to avoid pushing `next_tid' to _rq and
// then being popped by sched(pg), which is not necessary.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, TaskMeta* next_meta);
// The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs
// the target to be suspended already.
typedef void (*RemainedFn)(void*);
void set_remained(RemainedFn cb, void* arg) {
_last_context_remained = cb;
_last_context_remained_arg = arg;
}
// Suspend caller for at least |timeout_us| microseconds.
// If |timeout_us| is 0, this function does nothing.
// If |group| is NULL or current thread is non-bthread, call usleep(3)
// instead. This function does not create thread-local TaskGroup.
// Returns: 0 on success, -1 otherwise and errno is set.
static int usleep(TaskGroup** pg, uint64_t timeout_us);
// Suspend caller and run another bthread. When the caller will resume
// is undefined.
static void yield(TaskGroup** pg);
// Suspend caller until bthread `tid' terminates.
static int join(bthread_t tid, void** return_value);
// Returns true iff the bthread `tid' still exists. Notice that it is
// just the result at this very moment which may change soon.
// Don't use this function unless you have to. Never write code like this:
// if (exists(tid)) {
// Wait for events of the thread. // Racy, may block indefinitely.
// }
static bool exists(bthread_t tid);
// Put attribute associated with `tid' into `*attr'.
// Returns 0 on success, -1 otherwise and errno is set.
static int get_attr(bthread_t tid, bthread_attr_t* attr);
// Get/set TaskMeta.stop of the tid.
static void set_stopped(bthread_t tid);
static bool is_stopped(bthread_t tid);
// The bthread running run_main_task();
bthread_t main_tid() const { return _main_tid; }
TaskStatistics main_stat() const;
// Routine of the main task which should be called from a dedicated pthread.
void run_main_task();
// current_task is a function in macOS 10.0+
#ifdef current_task
#undef current_task
#endif
// Meta/Identifier of current task in this group.
TaskMeta* current_task() const { return _cur_meta; }
bthread_t current_tid() const { return _cur_meta->tid; }
// Uptime of current task in nanoseconds.
int64_t current_uptime_ns() const
{ return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
// True iff current task is the one running run_main_task()
bool is_current_main_task() const { return current_tid() == _main_tid; }
// True iff current task is in pthread-mode.
bool is_current_pthread_task() const
{ return _cur_meta->stack == _main_stack; }
// Active time in nanoseconds spent by this TaskGroup.
int64_t cumulated_cputime_ns() const;
// Push a bthread into the runqueue
void ready_to_run(TaskMeta* meta, bool nosignal = false);
// Flush tasks pushed to rq but signalled.
void flush_nosignal_tasks();
// Push a bthread into the runqueue from another non-worker thread.
void ready_to_run_remote(TaskMeta* meta, bool nosignal = false);
void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex);
void flush_nosignal_tasks_remote();
// Automatically decide the caller is remote or local, and call
// the corresponding function.
void ready_to_run_general(TaskMeta* meta, bool nosignal = false);
void flush_nosignal_tasks_general();
// The TaskControl that this TaskGroup belongs to.
TaskControl* control() const { return _control; }
// Call this instead of delete.
void destroy_self();
// Wake up blocking ops in the thread.
// Returns 0 on success, errno otherwise.
static int interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag);
// Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid);
// Push a task into _rq, if _rq is full, retry after some time. This
// process make go on indefinitely.
void push_rq(bthread_t tid);
// Returns size of local run queue.
size_t rq_size() const {
return _rq.volatile_size();
}
bthread_tag_t tag() const { return _tag; }
pthread_t tid() const { return _tid; }
int64_t current_task_cpu_clock_ns() {
if (_last_cpu_clock_ns == 0) {
return 0;
}
int64_t total_ns = _cur_meta->stat.cpu_usage_ns;
total_ns += butil::cputhread_time_ns() - _last_cpu_clock_ns;
return total_ns;
}
private:
friend class TaskControl;
// Last scheduling time, task type and cumulated CPU time.
class CPUTimeStat {
static constexpr int64_t LAST_SCHEDULING_TIME_MASK = 0x7FFFFFFFFFFFFFFFLL;
static constexpr int64_t TASK_TYPE_MASK = 0x8000000000000000LL;
public:
CPUTimeStat() : _last_run_ns_and_type(0), _cumulated_cputime_ns(0) {}
CPUTimeStat(AtomicInteger128::Value value)
: _last_run_ns_and_type(value.v1), _cumulated_cputime_ns(value.v2) {}
// Convert to AtomicInteger128::Value for atomic operations.
explicit operator AtomicInteger128::Value() const {
return {_last_run_ns_and_type, _cumulated_cputime_ns};
}
void set_last_run_ns(int64_t last_run_ns, bool main_task) {
_last_run_ns_and_type = (last_run_ns & LAST_SCHEDULING_TIME_MASK) |
(static_cast<int64_t>(main_task) << 63);
}
int64_t last_run_ns() const {
return _last_run_ns_and_type & LAST_SCHEDULING_TIME_MASK;
}
int64_t last_run_ns_and_type() const {
return _last_run_ns_and_type;
}
bool is_main_task() const {
return _last_run_ns_and_type & TASK_TYPE_MASK;
}
void add_cumulated_cputime_ns(int64_t cputime_ns, bool main_task) {
if (main_task) {
return;
}
_cumulated_cputime_ns += cputime_ns;
}
int64_t cumulated_cputime_ns() const {
return _cumulated_cputime_ns;
}
private:
// The higher bit for task type, main task is 1, otherwise 0.
// Lowest 63 bits for last scheduling time.
int64_t _last_run_ns_and_type;
// Cumulated CPU time in nanoseconds.
int64_t _cumulated_cputime_ns;
};
class AtomicCPUTimeStat {
public:
CPUTimeStat load() const {
return _cpu_time_stat.load();
}
CPUTimeStat load_unsafe() const {
return _cpu_time_stat.load_unsafe();
}
void store(CPUTimeStat cpu_time_stat) {
_cpu_time_stat.store(AtomicInteger128::Value(cpu_time_stat));
}
private:
AtomicInteger128 _cpu_time_stat;
};
// You shall use TaskControl::create_group to create new instance.
explicit TaskGroup(TaskControl* c);
int init(size_t runqueue_capacity);
// You shall call destroy_selfm() instead of destructor because deletion
// of groups are postponed to avoid race.
~TaskGroup();
#ifdef BUTIL_USE_ASAN
static void asan_task_runner(intptr_t);
#endif // BUTIL_USE_ASAN
static void task_runner(intptr_t skip_remained);
// Callbacks for set_remained()
static void _release_last_context(void*);
static void _add_sleep_event(void*);
struct ReadyToRunArgs {
bthread_tag_t tag;
TaskMeta* meta;
bool nosignal;
};
static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_ignoresignal(void*);
static void priority_to_run(void*);
// Wait for a task to run.
// Returns true on success, false is treated as permanent error and the
// loop calling this function should end.
bool wait_task(bthread_t* tid);
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
void set_tag(bthread_tag_t tag) { _tag = tag; }
void set_pl(ParkingLot* pl) { _pl = pl; }
static bool is_main_task(TaskGroup* g, bthread_t tid) {
return g->_main_tid == tid;
}
TaskMeta* _cur_meta{NULL};
// the control that this group belongs to
TaskControl* _control{NULL};
int _num_nosignal{0};
int _nsignaled{0};
AtomicCPUTimeStat _cpu_time_stat;
// last thread cpu clock
int64_t _last_cpu_clock_ns{0};
size_t _nswitch{0};
RemainedFn _last_context_remained{NULL};
void* _last_context_remained_arg{NULL};
ParkingLot* _pl{NULL};
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
ParkingLot::State _last_pl_state;
#endif
size_t _steal_seed{butil::fast_rand()};
size_t _steal_offset{prime_offset(_steal_seed)};
ContextualStack* _main_stack{NULL};
bthread_t _main_tid{INVALID_BTHREAD};
WorkStealingQueue<bthread_t> _rq;
RemoteTaskQueue _remote_rq;
int _remote_num_nosignal{0};
int _remote_nsignaled{0};
int _sched_recursive_guard{0};
// tag of this taskgroup
bthread_tag_t _tag{BTHREAD_TAG_DEFAULT};
// Worker thread id.
pthread_t _tid{};
};
} // namespace bthread
#include "task_group_inl.h"
#endif // BTHREAD_TASK_GROUP_H