blob: ae6f97b40ab0faa72b94edbc29644c3e297b61eb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: 2016/04/16 18:43:24
#include "bthread/execution_queue.h"
#include "butil/memory/singleton_on_pthread_once.h"
#include "butil/resource_pool.h" // butil::get_resource
#include "butil/threading/platform_thread.h"
namespace bthread {
//May be false on different platforms
//BAIDU_CASSERT(sizeof(TaskNode) == 128, sizeof_TaskNode_must_be_128);
//BAIDU_CASSERT(offsetof(TaskNode, static_task_mem) + sizeof(TaskNode().static_task_mem) == 128, sizeof_TaskNode_must_be_128);
BAIDU_CASSERT(sizeof(ExecutionQueue<int>) == sizeof(ExecutionQueueBase),
sizeof_ExecutionQueue_must_be_the_same_with_ExecutionQueueBase);
BAIDU_CASSERT(sizeof(TaskIterator<int>) == sizeof(TaskIteratorBase),
sizeof_TaskIterator_must_be_the_same_with_TaskIteratorBase);
namespace /*anonymous*/ {
typedef butil::ResourceId<ExecutionQueueBase> slot_id_t;
inline slot_id_t WARN_UNUSED_RESULT slot_of_id(uint64_t id) {
slot_id_t slot = { (id & 0xFFFFFFFFul) };
return slot;
}
inline uint64_t make_id(uint32_t version, slot_id_t slot) {
return (((uint64_t)version) << 32) | slot.value;
}
} // namespace anonymous
struct ExecutionQueueVars {
bvar::Adder<int64_t> running_task_count;
bvar::Adder<int64_t> execq_count;
bvar::Adder<int64_t> execq_active_count;
ExecutionQueueVars();
};
ExecutionQueueVars::ExecutionQueueVars()
: running_task_count("bthread_execq_running_task_count")
, execq_count("bthread_execq_count")
, execq_active_count("bthread_execq_active_count") {
}
inline ExecutionQueueVars* get_execq_vars() {
return butil::get_leaky_singleton<ExecutionQueueVars>();
}
void ExecutionQueueBase::start_execute(TaskNode* node) {
node->next = TaskNode::UNCONNECTED;
node->status = TaskNode::UNEXECUTED;
node->iterated = false;
if (node->high_priority) {
// Add _high_priority_tasks before pushing this task into queue to
// make sure that _execute_tasks sees the newest number when this
// task is in the queue. Although there might be some useless for
// loops in _execute_tasks if this thread is scheduled out at this
// point, we think it's just fine.
_high_priority_tasks.fetch_add(1, butil::memory_order_relaxed);
}
TaskNode* const prev_head = _head.exchange(node, butil::memory_order_release);
if (prev_head != NULL) {
node->next = prev_head;
return;
}
// Get the right to execute the task, start a bthread to avoid deadlock
// or stack overflow
node->next = NULL;
node->q = this;
ExecutionQueueVars* const vars = get_execq_vars();
vars->execq_active_count << 1;
if (node->in_place) {
int niterated = 0;
_execute(node, node->high_priority, &niterated);
TaskNode* tmp = node;
// return if no more
if (node->high_priority) {
_high_priority_tasks.fetch_sub(niterated, butil::memory_order_relaxed);
}
if (!_more_tasks(tmp, &tmp, !node->iterated)) {
vars->execq_active_count << -1;
return_task_node(node);
return;
}
}
if (nullptr == _options.executor) {
if (_options.use_pthread) {
if (_pthread_started) {
BAIDU_SCOPED_LOCK(_mutex);
_current_head = node;
_cond.Signal();
} else {
// Start the execution bthread in background once.
if (pthread_create(&_pid, NULL,
_execute_tasks_pthread,
node) != 0) {
PLOG(FATAL) << "Fail to create pthread";
_execute_tasks(node);
}
_pthread_started = true;
}
} else {
bthread_t tid;
// We start the execution bthread in background instead of foreground as
// we can't determine whether the code after execute() is urgent (like
// unlock a pthread_mutex_t) in which case implicit context switch may
// cause undefined behavior (e.g. deadlock)
if (bthread_start_background(&tid, &_options.bthread_attr,
_execute_tasks, node) != 0) {
PLOG(FATAL) << "Fail to start bthread";
_execute_tasks(node);
}
}
} else {
if (_options.executor->submit(_execute_tasks, node) != 0) {
PLOG(FATAL) << "Fail to submit task";
_execute_tasks(node);
}
}
}
void* ExecutionQueueBase::_execute_tasks(void* arg) {
ExecutionQueueVars* vars = get_execq_vars();
TaskNode* head = (TaskNode*)arg;
ExecutionQueueBase* m = (ExecutionQueueBase*)head->q;
TaskNode* cur_tail = NULL;
bool destroy_queue = false;
for (;;) {
if (head->iterated) {
CHECK(head->next != NULL);
TaskNode* saved_head = head;
head = head->next;
m->return_task_node(saved_head);
}
int rc = 0;
if (m->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
int nexecuted = 0;
// Don't care the return value
rc = m->_execute(head, true, &nexecuted);
m->_high_priority_tasks.fetch_sub(
nexecuted, butil::memory_order_relaxed);
if (nexecuted == 0) {
// Some high_priority tasks are not in queue
sched_yield();
}
} else {
rc = m->_execute(head, false, NULL);
}
if (rc == ESTOP) {
destroy_queue = true;
}
// Release TaskNode until uniterated task or last task
while (head->next != NULL && head->iterated) {
TaskNode* saved_head = head;
head = head->next;
m->return_task_node(saved_head);
}
if (cur_tail == NULL) {
for (cur_tail = head; cur_tail->next != NULL;
cur_tail = cur_tail->next) {}
}
// break when no more tasks and head has been executed
if (!m->_more_tasks(cur_tail, &cur_tail, !head->iterated)) {
CHECK_EQ(cur_tail, head);
CHECK(head->iterated);
m->return_task_node(head);
break;
}
}
if (destroy_queue) {
CHECK(m->_head.load(butil::memory_order_relaxed) == NULL);
CHECK(m->_stopped);
// Add _join_butex by 2 to make it equal to the next version of the
// ExecutionQueue from the same slot so that join with old id would
// return immediately.
//
// 1: release fence to make join sees the newest changes when it sees
// the newest _join_butex
m->_join_butex->fetch_add(2, butil::memory_order_release/*1*/);
butex_wake_all(m->_join_butex);
vars->execq_count << -1;
butil::return_resource(slot_of_id(m->_this_id));
}
vars->execq_active_count << -1;
return NULL;
}
void* ExecutionQueueBase::_execute_tasks_pthread(void* arg) {
butil::PlatformThread::SetNameSimple("ExecutionQueue");
auto head = (TaskNode*)arg;
auto m = (ExecutionQueueBase*)head->q;
m->_current_head = head;
while (true) {
BAIDU_SCOPED_LOCK(m->_mutex);
while (!m->_current_head) {
m->_cond.Wait();
}
_execute_tasks(m->_current_head);
m->_current_head = NULL;
int expected = _version_of_id(m->_this_id);
if (expected != m->_join_butex->load(butil::memory_order_relaxed)) {
// Execute queue has been stopped and stopped task has been executed, quit.
break;
}
}
return NULL;
}
void ExecutionQueueBase::return_task_node(TaskNode* node) {
node->clear_before_return(_clear_func);
butil::return_object<TaskNode>(node);
get_execq_vars()->running_task_count << -1;
}
void ExecutionQueueBase::_on_recycle() {
// Push a closed tasks
while (true) {
TaskNode* node = butil::get_object<TaskNode>();
if (BAIDU_LIKELY(node != NULL)) {
get_execq_vars()->running_task_count << 1;
node->stop_task = true;
node->high_priority = false;
node->in_place = false;
start_execute(node);
break;
}
CHECK(false) << "Fail to create task_node_t, " << berror();
::bthread_usleep(1000);
}
}
int ExecutionQueueBase::join(uint64_t id) {
const slot_id_t slot = slot_of_id(id);
ExecutionQueueBase* const m = butil::address_resource(slot);
if (m == NULL) {
// The queue is not created yet, this join is definitely wrong.
return EINVAL;
}
int expected = _version_of_id(id);
// acquire fence makes this thread see changes before changing _join_butex.
while (expected == m->_join_butex->load(butil::memory_order_acquire)) {
if (butex_wait(m->_join_butex, expected, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
// Join pthread if it's started.
if (m->_options.use_pthread && m->_pthread_started) {
pthread_join(m->_pid, NULL);
}
return 0;
}
int ExecutionQueueBase::stop() {
const uint32_t id_ver = _version_of_id(_this_id);
uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
for (;;) {
if (_version_of_vref(vref) != id_ver) {
return EINVAL;
}
// Try to set version=id_ver+1 (to make later address() return NULL),
// retry on fail.
if (_versioned_ref.compare_exchange_strong(
vref, _make_vref(id_ver + 1, _ref_of_vref(vref)),
butil::memory_order_release,
butil::memory_order_relaxed)) {
// Set _stopped to make lattern execute() fail immediately
_stopped.store(true, butil::memory_order_release);
// Deref additionally which is added at creation so that this
// queue's reference will hit 0(recycle) when no one addresses it.
_release_additional_reference();
// NOTE: This queue may be recycled at this point, don't
// touch anything.
return 0;
}
}
}
int ExecutionQueueBase::_execute(TaskNode* head, bool high_priority, int* niterated) {
if (head != NULL && head->stop_task) {
CHECK(head->next == NULL);
head->iterated = true;
head->status = TaskNode::EXECUTED;
TaskIteratorBase iter(NULL, this, true, false);
_execute_func(_meta, _type_specific_function, iter);
if (niterated) {
*niterated = 1;
}
return ESTOP;
}
TaskIteratorBase iter(head, this, false, high_priority);
if (iter) {
_execute_func(_meta, _type_specific_function, iter);
}
// We must assign |niterated| with num_iterated even if we couldn't peek
// any task to execute at the begining, in which case all the iterated
// tasks have been cancelled at this point. And we must return the
// correct num_iterated() to the caller to update the counter correctly.
if (niterated) {
*niterated = iter.num_iterated();
}
return 0;
}
TaskNode* ExecutionQueueBase::allocate_node() {
get_execq_vars()->running_task_count << 1;
return butil::get_object<TaskNode>();
}
TaskNode* const TaskNode::UNCONNECTED = (TaskNode*)-1L;
ExecutionQueueBase::scoped_ptr_t ExecutionQueueBase::address(uint64_t id) {
scoped_ptr_t ret;
const slot_id_t slot = slot_of_id(id);
ExecutionQueueBase* const m = butil::address_resource(slot);
if (BAIDU_LIKELY(m != NULL)) {
// acquire fence makes sure this thread sees latest changes before
// _dereference()
const uint64_t vref1 = m->_versioned_ref.fetch_add(
1, butil::memory_order_acquire);
const uint32_t ver1 = _version_of_vref(vref1);
if (ver1 == _version_of_id(id)) {
ret.reset(m);
return ret.Pass();
}
const uint64_t vref2 = m->_versioned_ref.fetch_sub(
1, butil::memory_order_release);
const int32_t nref = _ref_of_vref(vref2);
if (nref > 1) {
return ret.Pass();
} else if (__builtin_expect(nref == 1, 1)) {
const uint32_t ver2 = _version_of_vref(vref2);
if ((ver2 & 1)) {
if (ver1 == ver2 || ver1 + 1 == ver2) {
uint64_t expected_vref = vref2 - 1;
if (m->_versioned_ref.compare_exchange_strong(
expected_vref, _make_vref(ver2 + 1, 0),
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
m->_on_recycle();
// We don't return m immediatly when the reference count
// reaches 0 as there might be in processing tasks. Instead
// _on_recycle would push a `stop_task', after which
// is executed m would be finally reset and returned
}
} else {
CHECK(false) << "ref-version=" << ver1
<< " unref-version=" << ver2;
}
} else {
CHECK_EQ(ver1, ver2);
// Addressed a free slot.
}
} else {
CHECK(false) << "Over dereferenced id=" << id;
}
}
return ret.Pass();
}
int ExecutionQueueBase::create(uint64_t* id, const ExecutionQueueOptions* options,
execute_func_t execute_func,
clear_task_mem clear_func,
void* meta, void* type_specific_function) {
if (execute_func == NULL || clear_func == NULL) {
return EINVAL;
}
slot_id_t slot;
ExecutionQueueBase* const m = butil::get_resource(&slot, Forbidden());
if (BAIDU_LIKELY(m != NULL)) {
m->_execute_func = execute_func;
m->_clear_func = clear_func;
m->_meta = meta;
m->_type_specific_function = type_specific_function;
CHECK(m->_head.load(butil::memory_order_relaxed) == NULL);
CHECK_EQ(0, m->_high_priority_tasks.load(butil::memory_order_relaxed));
ExecutionQueueOptions opt;
if (options != NULL) {
opt = *options;
}
m->_options = opt;
m->_stopped.store(false, butil::memory_order_relaxed);
m->_this_id = make_id(
_version_of_vref(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)), slot);
*id = m->_this_id;
m->_pthread_started = false;
m->_current_head = NULL;
get_execq_vars()->execq_count << 1;
return 0;
}
return ENOMEM;
}
inline bool TaskIteratorBase::should_break_for_high_priority_tasks() {
if (!_high_priority &&
_q->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
_should_break = true;
return true;
}
return false;
}
void TaskIteratorBase::operator++() {
if (!(*this)) {
return;
}
if (_cur_node->iterated) {
_cur_node = _cur_node->next;
}
if (should_break_for_high_priority_tasks()) {
return;
} // else the next high_priority_task would be delayed for at most one task
while (_cur_node && !_cur_node->stop_task) {
if (_high_priority == _cur_node->high_priority) {
if (!_cur_node->iterated && _cur_node->peek_to_execute()) {
++_num_iterated;
_cur_node->iterated = true;
return;
}
_num_iterated += !_cur_node->iterated;
_cur_node->iterated = true;
}
_cur_node = _cur_node->next;
}
return;
}
TaskIteratorBase::~TaskIteratorBase() {
// Set the iterated tasks as EXECUTED here instead of waiting them to be
// returned in _start_execute as the high_priority_task might be in the
// middle of the linked list and is not going to be returned soon
if (_is_stopped) {
return;
}
while (_head != _cur_node) {
if (_head->iterated && _head->high_priority == _high_priority) {
_head->set_executed();
}
_head = _head->next;
}
if (_should_break && _cur_node != NULL
&& _cur_node->high_priority == _high_priority && _cur_node->iterated) {
_cur_node->set_executed();
}
}
} // namespace bthread