blob: ba067e3976346a53a9a8e2bd7e7c83b9f69b3362 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: Tue Jul 10 17:40:58 CST 2012
#include <pthread.h>
#include <set>
#include <regex>
#include <sys/syscall.h> // SYS_gettid
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "butil/errno.h" // berror
#include "butil/logging.h"
#include "butil/threading/platform_thread.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "bthread/sys_futex.h" // futex_wake_private
#include "bthread/interrupt_pthread.h"
#include "bthread/processor.h" // cpu_relax
#include "bthread/task_group.h" // TaskGroup
#include "bthread/task_control.h"
#include "bthread/timer_thread.h" // global_timer_thread
#include <gflags/gflags.h>
#include "bthread/log.h"
#if defined(OS_MACOSX)
#include <mach/mach.h>
#endif
DEFINE_int32(task_group_delete_delay, 1,
"delay deletion of TaskGroup for so many seconds");
DEFINE_int32(task_group_runqueue_capacity, 4096,
"capacity of runqueue in each TaskGroup");
DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
DEFINE_bool(task_group_set_worker_name, true,
"Whether to set the name of the worker thread");
DEFINE_string(cpu_set, "",
"Set of CPUs to which cores are bound. "
"for example, 0-3,5,7; default: disable");
namespace bthread {
DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
"ParkingLot doesn't signal when there is no waiter. "
"In busy worker scenarios, signal overhead can be reduced.");
DEFINE_bool(enable_bthread_priority_queue, false, "Whether to enable priority queue");
DECLARE_int32(bthread_concurrency);
DECLARE_int32(bthread_min_concurrency);
DECLARE_int32(bthread_parking_lot_of_each_tag);
extern pthread_mutex_t g_task_control_mutex;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
void (*g_worker_startfn)() = NULL;
void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
// May be called in other modules to run startfn in non-worker pthreads.
void run_worker_startfn() {
if (g_worker_startfn) {
g_worker_startfn();
}
}
void run_tagged_worker_startfn(bthread_tag_t tag) {
if (g_tagged_worker_startfn) {
g_tagged_worker_startfn(tag);
}
}
struct WorkerThreadArgs {
WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
TaskControl* c;
bthread_tag_t tag;
};
void* TaskControl::worker_thread(void* arg) {
run_worker_startfn();
#ifdef BAIDU_INTERNAL
logging::ComlogInitializer comlog_initializer;
#endif
auto dummy = static_cast<WorkerThreadArgs*>(arg);
auto c = dummy->c;
auto tag = dummy->tag;
delete dummy;
run_tagged_worker_startfn(tag);
TaskGroup* g = c->create_group(tag);
TaskStatistics stat;
if (NULL == g) {
LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
return NULL;
}
g->_tid = pthread_self();
int worker_id = c->_next_worker_id.fetch_add(
1, butil::memory_order_relaxed);
if (!c->_cpus.empty()) {
bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]);
}
if (FLAGS_task_group_set_worker_name) {
std::string worker_thread_name = butil::string_printf(
"brpc_wkr:%d-%d", g->tag(), worker_id);
butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
}
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
<< " bthread=" << g->main_tid() << " tag=" << g->tag();
tls_task_group = g;
c->_nworkers << 1;
c->tag_nworkers(g->tag()) << 1;
g->run_main_task();
stat = g->main_stat();
BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
<< g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
<< "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
tls_task_group = NULL;
g->destroy_self();
c->_nworkers << -1;
c->tag_nworkers(g->tag()) << -1;
return NULL;
}
TaskGroup* TaskControl::create_group(bthread_tag_t tag) {
TaskGroup* g = new (std::nothrow) TaskGroup(this);
if (NULL == g) {
LOG(FATAL) << "Fail to new TaskGroup";
return NULL;
}
if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
LOG(ERROR) << "Fail to init TaskGroup";
delete g;
return NULL;
}
if (_add_group(g, tag) != 0) {
delete g;
return NULL;
}
return g;
}
static void print_rq_sizes_in_the_tc(std::ostream &os, void *arg) {
TaskControl *tc = (TaskControl *)arg;
tc->print_rq_sizes(os);
}
static double get_cumulated_worker_time_from_this(void *arg) {
return static_cast<TaskControl*>(arg)->get_cumulated_worker_time();
}
struct CumulatedWithTagArgs {
CumulatedWithTagArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), t(_t) {}
TaskControl* c;
bthread_tag_t t;
};
static double get_cumulated_worker_time_from_this_with_tag(void* arg) {
auto a = static_cast<CumulatedWithTagArgs*>(arg);
auto c = a->c;
auto t = a->t;
return c->get_cumulated_worker_time(t);
}
static int64_t get_cumulated_switch_count_from_this(void *arg) {
return static_cast<TaskControl*>(arg)->get_cumulated_switch_count();
}
static int64_t get_cumulated_signal_count_from_this(void *arg) {
return static_cast<TaskControl*>(arg)->get_cumulated_signal_count();
}
TaskControl::TaskControl()
// NOTE: all fileds must be initialized before the vars.
: _tagged_ngroup(FLAGS_task_group_ntags)
, _tagged_groups(FLAGS_task_group_ntags)
, _init(false)
, _stop(false)
, _concurrency(0)
, _next_worker_id(0)
, _nworkers("bthread_worker_count")
, _pending_time(NULL)
// Delay exposure of following two vars because they rely on TC which
// is not initialized yet.
, _cumulated_worker_time(get_cumulated_worker_time_from_this, this)
, _worker_usage_second(&_cumulated_worker_time, 1)
, _cumulated_switch_count(get_cumulated_switch_count_from_this, this)
, _switch_per_second(&_cumulated_switch_count)
, _cumulated_signal_count(get_cumulated_signal_count_from_this, this)
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
, _priority_queues(FLAGS_task_group_ntags)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
{}
int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
return -1;
}
if (concurrency <= 0) {
LOG(ERROR) << "Invalid concurrency=" << concurrency;
return -1;
}
_concurrency = concurrency;
if (!FLAGS_cpu_set.empty()) {
if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
return -1;
}
}
// task group group by tags
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
_tagged_ngroup[i].store(0, std::memory_order_relaxed);
auto tag_str = std::to_string(i);
_tagged_nworkers.push_back(new bvar::Adder<int64_t>("bthread_worker_count", tag_str));
_tagged_cumulated_worker_time.push_back(new bvar::PassiveStatus<double>(
get_cumulated_worker_time_from_this_with_tag, new CumulatedWithTagArgs{this, i}));
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
LOG(ERROR) << "Fail to init _priority_q";
return -1;
}
}
// Make sure TimerThread is ready.
if (get_or_create_global_timer_thread() == NULL) {
LOG(ERROR) << "Fail to get global_timer_thread";
return -1;
}
#ifdef BRPC_BTHREAD_TRACER
if (!_task_tracer.Init()) {
LOG(ERROR) << "Fail to init TaskTracer";
return -1;
}
#endif // BRPC_BTHREAD_TRACER
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
if (rc) {
delete arg;
PLOG(ERROR) << "Fail to create _workers[" << i << "]";
return -1;
}
}
_worker_usage_second.expose("bthread_worker_usage");
_switch_per_second.expose("bthread_switch_second");
_signal_per_second.expose("bthread_signal_second");
_status.expose("bthread_group_status");
// Wait for at least one group is added so that choose_one_group()
// never returns NULL.
// TODO: Handle the case that worker quits before add_group
for (int i = 0; i < FLAGS_task_group_ntags;) {
if (_tagged_ngroup[i].load(std::memory_order_acquire) == 0) {
usleep(100); // TODO: Elaborate
continue;
}
++i;
}
_init.store(true, butil::memory_order_release);
return 0;
}
int TaskControl::add_workers(int num, bthread_tag_t tag) {
if (num <= 0) {
return 0;
}
try {
_workers.resize(_concurrency + num);
} catch (...) {
return 0;
}
const int old_concurency = _concurrency.load(butil::memory_order_relaxed);
for (int i = 0; i < num; ++i) {
// Worker will add itself to _idle_workers, so we have to add
// _concurrency before create a worker.
_concurrency.fetch_add(1);
auto arg = new WorkerThreadArgs(this, tag);
const int rc = pthread_create(
&_workers[i + old_concurency], NULL, worker_thread, arg);
if (rc) {
delete arg;
PLOG(WARNING) << "Fail to create _workers[" << i + old_concurency << "]";
_concurrency.fetch_sub(1, butil::memory_order_release);
break;
}
}
// Cannot fail
_workers.resize(_concurrency.load(butil::memory_order_relaxed));
return _concurrency.load(butil::memory_order_relaxed) - old_concurency;
}
TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
auto& groups = tag_group(tag);
const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
if (ngroup != 0) {
return groups[butil::fast_rand_less_than(ngroup)];
}
CHECK(false) << "Impossible: ngroup is 0";
return NULL;
}
int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
std::smatch match;
std::set<unsigned> cpuset;
if (value.empty()) {
return -1;
}
if (std::regex_match(value, match, r)) {
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
butil::StringPiece cpu_ids(split.field(), split.length());
cpu_ids.trim_spaces();
butil::StringPiece begin = cpu_ids;
butil::StringPiece end = cpu_ids;
auto dash = cpu_ids.find('-');
if (dash != cpu_ids.npos) {
begin = cpu_ids.substr(0, dash);
end = cpu_ids.substr(dash + 1);
}
unsigned first = UINT_MAX;
unsigned last = 0;
int ret;
ret = butil::StringSplitter(begin, '\t').to_uint(&first);
ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
if (ret != 0 || first > last) {
return -1;
}
for (auto i = first; i <= last; ++i) {
cpuset.insert(i);
}
}
cpus.assign(cpuset.begin(), cpuset.end());
return 0;
}
return -1;
}
void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
#if defined(OS_LINUX)
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(cpu_id, &cs);
auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
if (r != 0) {
LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
}
(void)r;
#elif defined(OS_MACOSX)
thread_port_t mach_thread = pthread_mach_thread_np(pthread);
if (mach_thread != MACH_PORT_NULL) {
LOG(WARNING) << "mach_thread is null"
<< "Failed to bind thread to cpu: " << cpu_id;
return;
}
thread_affinity_policy_data_t policy;
policy.affinity_tag = cpu_id;
if (thread_policy_set(mach_thread,
THREAD_AFFINITY_POLICY,
(thread_policy_t)&policy,
THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
}
#endif
}
#ifdef BRPC_BTHREAD_TRACER
void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
_task_tracer.Trace(os, tid);
}
std::string TaskControl::stack_trace(bthread_t tid) {
return _task_tracer.Trace(tid);
}
#endif // BRPC_BTHREAD_TRACER
extern int stop_and_join_epoll_threads();
void TaskControl::stop_and_join() {
// Close epoll threads so that worker threads are not waiting on epoll(
// which cannot be woken up by signal_task below)
CHECK_EQ(0, stop_and_join_epoll_threads());
// Stop workers
{
BAIDU_SCOPED_LOCK(_modify_group_mutex);
_stop = true;
std::for_each(
_tagged_ngroup.begin(), _tagged_ngroup.end(),
[](butil::atomic<size_t>& index) { index.store(0, butil::memory_order_relaxed); });
}
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (auto& pl : _tagged_pl[i]) {
pl.stop();
}
}
for (auto worker: _workers) {
// Interrupt blocking operations.
#ifdef BRPC_BTHREAD_TRACER
pthread_kill(worker, _task_tracer.get_trace_signal());
#else
interrupt_pthread(worker);
#endif // BRPC_BTHREAD_TRACER
}
// Join workers
for (auto worker : _workers) {
pthread_join(worker, NULL);
}
}
TaskControl::~TaskControl() {
// NOTE: g_task_control is not destructed now because the situation
// is extremely racy.
delete _pending_time.exchange(NULL, butil::memory_order_relaxed);
_worker_usage_second.hide();
_switch_per_second.hide();
_signal_per_second.hide();
_status.hide();
stop_and_join();
}
int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) {
if (__builtin_expect(NULL == g, 0)) {
return -1;
}
std::unique_lock<butil::Mutex> mu(_modify_group_mutex);
if (_stop) {
return -1;
}
g->set_tag(tag);
g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag]);
size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
_tagged_groups[tag][ngroup] = g;
_tagged_ngroup[tag].store(ngroup + 1, butil::memory_order_release);
}
mu.unlock();
// See the comments in _destroy_group
// TODO: Not needed anymore since non-worker pthread cannot have TaskGroup
// signal_task(65536, tag);
return 0;
}
void TaskControl::delete_task_group(void* arg) {
delete(TaskGroup*)arg;
}
int TaskControl::_destroy_group(TaskGroup* g) {
if (NULL == g) {
LOG(ERROR) << "Param[g] is NULL";
return -1;
}
if (g->_control != this) {
LOG(ERROR) << "TaskGroup=" << g
<< " does not belong to this TaskControl=" << this;
return -1;
}
bool erased = false;
{
BAIDU_SCOPED_LOCK(_modify_group_mutex);
auto tag = g->tag();
auto& groups = tag_group(tag);
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
for (size_t i = 0; i < ngroup; ++i) {
if (groups[i] == g) {
// No need for atomic_thread_fence because lock did it.
groups[i] = groups[ngroup - 1];
// Change _ngroup and keep _groups unchanged at last so that:
// - If steal_task sees the newest _ngroup, it would not touch
// _groups[ngroup -1]
// - If steal_task sees old _ngroup and is still iterating on
// _groups, it would not miss _groups[ngroup - 1] which was
// swapped to _groups[i]. Although adding new group would
// overwrite it, since we do signal_task in _add_group(),
// we think the pending tasks of _groups[ngroup - 1] would
// not miss.
tag_ngroup(tag).store(ngroup - 1, butil::memory_order_release);
//_groups[ngroup - 1] = NULL;
erased = true;
break;
}
}
}
// Can't delete g immediately because for performance consideration,
// we don't lock _modify_group_mutex in steal_task which may
// access the removed group concurrently. We use simple strategy here:
// Schedule a function which deletes the TaskGroup after
// FLAGS_task_group_delete_delay seconds
if (erased) {
get_global_timer_thread()->schedule(
delete_task_group, g,
butil::microseconds_from_now(FLAGS_task_group_delete_delay * 1000000L));
}
return 0;
}
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();
if (_priority_queues[tag].steal(tid)) {
return true;
}
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
if (num_task <= 0) {
return;
}
// TODO(gejun): Current algorithm does not guarantee enough threads will
// be created to match caller's requests. But in another side, there's also
// many useless signalings according to current impl. Capping the concurrency
// is a good balance between performance and timeliness of scheduling.
if (num_task > 2) {
num_task = 2;
}
auto& pl = tag_pl(tag);
size_t start_index = butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag;
for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) {
num_task -= pl[start_index].signal(1);
if (++start_index >= _pl_num_of_each_tag) {
start_index = 0;
}
}
if (num_task > 0 &&
FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
add_workers(1, tag);
}
}
}
void TaskControl::print_rq_sizes(std::ostream& os) {
size_t ngroup = 0;
std::for_each(_tagged_ngroup.begin(), _tagged_ngroup.end(), [&](butil::atomic<size_t>& index) {
ngroup += index.load(butil::memory_order_relaxed);
});
DEFINE_SMALL_ARRAY(int, nums, ngroup, 128);
{
BAIDU_SCOPED_LOCK(_modify_group_mutex);
// ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0
// ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1]
int i = 0;
for_each_task_group([&](TaskGroup* g) {
nums[i] = (g ? g->_rq.volatile_size() : 0);
++i;
});
}
for (size_t i = 0; i < ngroup; ++i) {
os << nums[i] << ' ';
}
}
double TaskControl::get_cumulated_worker_time() {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
for_each_task_group([&](TaskGroup* g) {
cputime_ns += g->cumulated_cputime_ns();
});
return cputime_ns / 1000000000.0;
}
double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i) {
cputime_ns += groups[i]->cumulated_cputime_ns();
}
return cputime_ns / 1000000000.0;
}
int64_t TaskControl::get_cumulated_switch_count() {
int64_t c = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
for_each_task_group([&](TaskGroup* g) {
if (g) {
c += g->_nswitch;
}
});
return c;
}
int64_t TaskControl::get_cumulated_signal_count() {
int64_t c = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
for_each_task_group([&](TaskGroup* g) {
if (g) {
c += g->_nsignaled + g->_remote_nsignaled;
}
});
return c;
}
bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() {
bool is_creator = false;
_pending_time_mutex.lock();
bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume);
if (!pt) {
pt = new bvar::LatencyRecorder;
_pending_time.store(pt, butil::memory_order_release);
is_creator = true;
}
_pending_time_mutex.unlock();
if (is_creator) {
pt->expose("bthread_creation");
}
return pt;
}
std::vector<bthread_t> TaskControl::get_living_bthreads() {
std::vector<bthread_t> living_bthread_ids;
living_bthread_ids.reserve(1024);
butil::for_each_resource<TaskMeta>([&living_bthread_ids](TaskMeta* m) {
// filter out those bthreads created by bthread_start* functions,
// i.e. not those created internally to run main task as they are
// opaque to user.
if (m && m->fn) {
// determine whether the bthread is living by checking version
const uint32_t given_ver = get_version(m->tid);
BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
living_bthread_ids.push_back(m->tid);
}
}
});
return living_bthread_ids;
}
} // namespace bthread