blob: 7aabed68376200feea5fd224db1989e3e4f1f329 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: Sun Aug 3 12:46:15 CST 2014
#include <deque>
#include "butil/logging.h"
#include "bthread/butex.h" // butex_*
#include "bthread/mutex.h"
#include "bthread/list_of_abafree_id.h"
#include "butil/resource_pool.h"
#include "bthread/bthread.h"
namespace bthread {
// This queue reduces the chance to allocate memory for deque
template <typename T, int N>
class SmallQueue {
public:
SmallQueue() : _begin(0), _size(0), _full(NULL) {}
void push(const T& val) {
if (_full != NULL && !_full->empty()) {
_full->push_back(val);
} else if (_size < N) {
int tail = _begin + _size;
if (tail >= N) {
tail -= N;
}
_c[tail] = val;
++_size;
} else {
if (_full == NULL) {
_full = new std::deque<T>;
}
_full->push_back(val);
}
}
bool pop(T* val) {
if (_size > 0) {
*val = _c[_begin];
++_begin;
if (_begin >= N) {
_begin -= N;
}
--_size;
return true;
} else if (_full && !_full->empty()) {
*val = _full->front();
_full->pop_front();
return true;
}
return false;
}
bool empty() const {
return _size == 0 && (_full == NULL || _full->empty());
}
size_t size() const {
return _size + (_full ? _full->size() : 0);
}
void clear() {
_size = 0;
_begin = 0;
if (_full) {
_full->clear();
}
}
~SmallQueue() {
delete _full;
_full = NULL;
}
private:
DISALLOW_COPY_AND_ASSIGN(SmallQueue);
int _begin;
int _size;
T _c[N];
std::deque<T>* _full;
};
struct PendingError {
bthread_id_t id;
int error_code;
std::string error_text;
const char *location;
PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {}
};
struct BAIDU_CACHELINE_ALIGNMENT Id {
// first_ver ~ locked_ver - 1: unlocked versions
// locked_ver: locked
// unlockable_ver: locked and about to be destroyed
// contended_ver: locked and contended
uint32_t first_ver;
uint32_t locked_ver;
FastPthreadMutex mutex;
void* data;
int (*on_error)(bthread_id_t, void*, int);
int (*on_error2)(bthread_id_t, void*, int, const std::string&);
const char *lock_location;
uint32_t* butex;
uint32_t* join_butex;
SmallQueue<PendingError, 2> pending_q;
Id() {
// Although value of the butex(as version part of bthread_id_t)
// does not matter, we set it to 0 to make program more deterministic.
butex = bthread::butex_create_checked<uint32_t>();
join_butex = bthread::butex_create_checked<uint32_t>();
*butex = 0;
*join_butex = 0;
}
~Id() {
bthread::butex_destroy(butex);
bthread::butex_destroy(join_butex);
}
inline bool has_version(uint32_t id_ver) const {
return id_ver >= first_ver && id_ver < locked_ver;
}
inline uint32_t contended_ver() const { return locked_ver + 1; }
inline uint32_t unlockable_ver() const { return locked_ver + 2; }
inline uint32_t last_ver() const { return unlockable_ver(); }
// also the next "first_ver"
inline uint32_t end_ver() const { return last_ver() + 1; }
};
BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align);
typedef butil::ResourceId<Id> IdResourceId;
inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
const bthread_id_t tmp =
{ (((uint64_t)slot.value) << 32) | (uint64_t)version };
return tmp;
}
inline IdResourceId get_slot(bthread_id_t id) {
const IdResourceId tmp = { (id.value >> 32) };
return tmp;
}
inline uint32_t get_version(bthread_id_t id) {
return (uint32_t)(id.value & 0xFFFFFFFFul);
}
inline bool id_exists_with_true_negatives(bthread_id_t id) {
Id* const meta = address_resource(get_slot(id));
if (meta == NULL) {
return false;
}
const uint32_t id_ver = bthread::get_version(id);
return id_ver >= meta->first_ver && id_ver <= meta->last_ver();
}
// required by unittest
uint32_t id_value(bthread_id_t id) {
Id* const meta = address_resource(get_slot(id));
if (meta != NULL) {
return *meta->butex;
}
return 0; // valid version never be zero
}
static int default_bthread_id_on_error(bthread_id_t id, void*, int) {
return bthread_id_unlock_and_destroy(id);
}
static int default_bthread_id_on_error2(
bthread_id_t id, void*, int, const std::string&) {
return bthread_id_unlock_and_destroy(id);
}
void id_status(bthread_id_t id, std::ostream &os) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
os << "Invalid id=" << id.value << '\n';
return;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
bool valid = true;
void* data = NULL;
int (*on_error)(bthread_id_t, void*, int) = NULL;
int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL;
uint32_t first_ver = 0;
uint32_t locked_ver = 0;
uint32_t unlockable_ver = 0;
uint32_t contended_ver = 0;
const char *lock_location = NULL;
SmallQueue<PendingError, 2> pending_q;
uint32_t butex_value = 0;
meta->mutex.lock();
if (meta->has_version(id_ver)) {
data = meta->data;
on_error = meta->on_error;
on_error2 = meta->on_error2;
first_ver = meta->first_ver;
locked_ver = meta->locked_ver;
unlockable_ver = meta->unlockable_ver();
contended_ver = meta->contended_ver();
lock_location = meta->lock_location;
const size_t size = meta->pending_q.size();
for (size_t i = 0; i < size; ++i) {
PendingError front;
meta->pending_q.pop(&front);
meta->pending_q.push(front);
pending_q.push(front);
}
butex_value = *butex;
} else {
valid = false;
}
meta->mutex.unlock();
if (valid) {
os << "First id: "
<< bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n'
<< "Range: " << locked_ver - first_ver << '\n'
<< "Status: ";
if (butex_value != first_ver) {
os << "LOCKED at " << lock_location;
if (butex_value == contended_ver) {
os << " (CONTENDED)";
} else if (butex_value == unlockable_ver) {
os << " (ABOUT TO DESTROY)";
} else {
os << " (UNCONTENDED)";
}
} else {
os << "UNLOCKED";
}
os << "\nPendingQ:";
if (pending_q.empty()) {
os << " EMPTY";
} else {
const size_t size = pending_q.size();
for (size_t i = 0; i < size; ++i) {
PendingError front;
pending_q.pop(&front);
os << " (" << front.location << "/E" << front.error_code
<< '/' << front.error_text << ')';
}
}
if (on_error) {
if (on_error == default_bthread_id_on_error) {
os << "\nOnError: unlock_and_destroy";
} else {
os << "\nOnError: " << (void*)on_error;
}
} else {
if (on_error2 == default_bthread_id_on_error2) {
os << "\nOnError2: unlock_and_destroy";
} else {
os << "\nOnError2: " << (void*)on_error2;
}
}
os << "\nData: " << data;
} else {
os << "Invalid id=" << id.value;
}
os << '\n';
}
void id_pool_status(std::ostream &os) {
os << butil::describe_resources<Id>() << '\n';
}
struct IdTraits {
static const size_t BLOCK_SIZE = 63;
static const size_t MAX_ENTRIES = 100000;
static const size_t INIT_GC_SIZE = 4096;
static const bthread_id_t ID_INIT;
static bool exists(bthread_id_t id)
{ return bthread::id_exists_with_true_negatives(id); }
};
const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID;
typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList;
struct IdResetter {
explicit IdResetter(int ec, const std::string& et)
: _error_code(ec), _error_text(et) {}
void operator()(bthread_id_t & id) const {
bthread_id_error2_verbose(
id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__));
id = INVALID_BTHREAD_ID;
}
private:
int _error_code;
const std::string& _error_text;
};
size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) {
if (list->impl == NULL) {
return 0;
}
return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n);
}
const int ID_MAX_RANGE = 1024;
static int id_create_impl(
bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int),
int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
IdResourceId slot;
Id* const meta = get_resource(&slot);
if (meta) {
meta->data = data;
meta->on_error = on_error;
meta->on_error2 = on_error2;
CHECK(meta->pending_q.empty());
uint32_t* butex = meta->butex;
if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
// Skip 0 so that bthread_id_t is never 0
// avoid overflow to make comparisons simpler.
*butex = 1;
}
*meta->join_butex = *butex;
meta->first_ver = *butex;
meta->locked_ver = *butex + 1;
*id = make_id(*butex, slot);
return 0;
}
return ENOMEM;
}
static int id_create_ranged_impl(
bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int),
int (*on_error2)(bthread_id_t, void*, int, const std::string&),
int range) {
if (range < 1 || range > ID_MAX_RANGE) {
LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range;
LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is "
<< ID_MAX_RANGE << ", actually " << range;
return EINVAL;
}
IdResourceId slot;
Id* const meta = get_resource(&slot);
if (meta) {
meta->data = data;
meta->on_error = on_error;
meta->on_error2 = on_error2;
CHECK(meta->pending_q.empty());
uint32_t* butex = meta->butex;
if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
// Skip 0 so that bthread_id_t is never 0
// avoid overflow to make comparisons simpler.
*butex = 1;
}
*meta->join_butex = *butex;
meta->first_ver = *butex;
meta->locked_ver = *butex + range;
*id = make_id(*butex, slot);
return 0;
}
return ENOMEM;
}
} // namespace bthread
extern "C" {
int bthread_id_create(
bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int)) {
return bthread::id_create_impl(
id, data,
(on_error ? on_error : bthread::default_bthread_id_on_error), NULL);
}
int bthread_id_create_ranged(bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int),
int range) {
return bthread::id_create_ranged_impl(
id, data,
(on_error ? on_error : bthread::default_bthread_id_on_error),
NULL, range);
}
int bthread_id_lock_and_reset_range_verbose(
bthread_id_t id, void **pdata, int range, const char *location) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
bool ever_contended = false;
meta->mutex.lock();
while (meta->has_version(id_ver)) {
if (*butex == meta->first_ver) {
// contended locker always wakes up the butex at unlock.
meta->lock_location = location;
if (range == 0) {
// fast path
} else if (range < 0 ||
range > bthread::ID_MAX_RANGE ||
range + meta->first_ver <= meta->locked_ver) {
LOG_IF(FATAL, range < 0) << "range must be positive, actually "
<< range;
LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
<< "max range is " << bthread::ID_MAX_RANGE
<< ", actually " << range;
} else {
meta->locked_ver = meta->first_ver + range;
}
*butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
meta->mutex.unlock();
if (pdata) {
*pdata = meta->data;
}
return 0;
} else if (*butex != meta->unlockable_ver()) {
*butex = meta->contended_ver();
uint32_t expected_ver = *butex;
meta->mutex.unlock();
ever_contended = true;
if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
meta->mutex.lock();
} else { // bthread_id_about_to_destroy was called.
meta->mutex.unlock();
return EPERM;
}
}
meta->mutex.unlock();
return EINVAL;
}
int bthread_id_error_verbose(bthread_id_t id, int error_code,
const char *location) {
return bthread_id_error2_verbose(id, error_code, std::string(), location);
}
int bthread_id_about_to_destroy(bthread_id_t id) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex == meta->first_ver) {
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
const bool contended = (*butex == meta->contended_ver());
*butex = meta->unlockable_ver();
meta->mutex.unlock();
if (contended) {
// wake up all waiting lockers.
bthread::butex_wake_except(butex, 0);
}
return 0;
}
int bthread_id_cancel(bthread_id_t id) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
const uint32_t id_ver = bthread::get_version(id);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex != meta->first_ver) {
meta->mutex.unlock();
return EPERM;
}
*butex = meta->end_ver();
meta->first_ver = *butex;
meta->locked_ver = *butex;
meta->mutex.unlock();
return_resource(bthread::get_slot(id));
return 0;
}
int bthread_id_join(bthread_id_t id) {
const bthread::IdResourceId slot = bthread::get_slot(id);
bthread::Id* const meta = address_resource(slot);
if (!meta) {
// The id is not created yet, this join is definitely wrong.
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* join_butex = meta->join_butex;
while (1) {
meta->mutex.lock();
const bool has_ver = meta->has_version(id_ver);
const uint32_t expected_ver = *join_butex;
meta->mutex.unlock();
if (!has_ver) {
break;
}
if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
return 0;
}
int bthread_id_trylock(bthread_id_t id, void** pdata) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
const uint32_t id_ver = bthread::get_version(id);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex != meta->first_ver) {
meta->mutex.unlock();
return EBUSY;
}
*butex = meta->locked_ver;
meta->mutex.unlock();
if (pdata != NULL) {
*pdata = meta->data;
}
return 0;
}
int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
const char *location) {
return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
}
int bthread_id_unlock(bthread_id_t id) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
// Release fence makes sure all changes made before signal visible to
// woken-up waiters.
const uint32_t id_ver = bthread::get_version(id);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
LOG(FATAL) << "Invalid bthread_id=" << id.value;
return EINVAL;
}
if (*butex == meta->first_ver) {
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
bthread::PendingError front;
if (meta->pending_q.pop(&front)) {
meta->lock_location = front.location;
meta->mutex.unlock();
if (meta->on_error) {
return meta->on_error(front.id, meta->data, front.error_code);
} else {
return meta->on_error2(front.id, meta->data, front.error_code,
front.error_text);
}
} else {
const bool contended = (*butex == meta->contended_ver());
*butex = meta->first_ver;
meta->mutex.unlock();
if (contended) {
// We may wake up already-reused id, but that's OK.
bthread::butex_wake(butex);
}
return 0;
}
}
int bthread_id_unlock_and_destroy(bthread_id_t id) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
uint32_t* join_butex = meta->join_butex;
const uint32_t id_ver = bthread::get_version(id);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
LOG(FATAL) << "Invalid bthread_id=" << id.value;
return EINVAL;
}
if (*butex == meta->first_ver) {
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
const uint32_t next_ver = meta->end_ver();
*butex = next_ver;
*join_butex = next_ver;
meta->first_ver = next_ver;
meta->locked_ver = next_ver;
meta->pending_q.clear();
meta->mutex.unlock();
// Notice that butex_wake* returns # of woken-up, not successful or not.
bthread::butex_wake_except(butex, 0);
bthread::butex_wake_all(join_butex);
return_resource(bthread::get_slot(id));
return 0;
}
int bthread_id_list_init(bthread_id_list_t* list,
unsigned /*size*/,
unsigned /*conflict_size*/) {
list->impl = NULL; // create on demand.
// Set unused fields to zero as well.
list->head = 0;
list->size = 0;
list->conflict_head = 0;
list->conflict_size = 0;
return 0;
}
void bthread_id_list_destroy(bthread_id_list_t* list) {
delete static_cast<bthread::IdList*>(list->impl);
list->impl = NULL;
}
int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) {
if (list->impl == NULL) {
list->impl = new (std::nothrow) bthread::IdList;
if (NULL == list->impl) {
return ENOMEM;
}
}
return static_cast<bthread::IdList*>(list->impl)->add(id);
}
int bthread_id_list_reset(bthread_id_list_t* list, int error_code) {
return bthread_id_list_reset2(list, error_code, std::string());
}
void bthread_id_list_swap(bthread_id_list_t* list1,
bthread_id_list_t* list2) {
std::swap(list1->impl, list2->impl);
}
int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code,
pthread_mutex_t* mutex) {
return bthread_id_list_reset2_pthreadsafe(
list, error_code, std::string(), mutex);
}
int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code,
bthread_mutex_t* mutex) {
return bthread_id_list_reset2_bthreadsafe(
list, error_code, std::string(), mutex);
}
} // extern "C"
int bthread_id_create2(
bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int, const std::string&)) {
return bthread::id_create_impl(
id, data, NULL,
(on_error ? on_error : bthread::default_bthread_id_on_error2));
}
int bthread_id_create2_ranged(
bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int, const std::string&),
int range) {
return bthread::id_create_ranged_impl(
id, data, NULL,
(on_error ? on_error : bthread::default_bthread_id_on_error2), range);
}
int bthread_id_error2_verbose(bthread_id_t id, int error_code,
const std::string& error_text,
const char *location) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex == meta->first_ver) {
*butex = meta->locked_ver;
meta->lock_location = location;
meta->mutex.unlock();
if (meta->on_error) {
return meta->on_error(id, meta->data, error_code);
} else {
return meta->on_error2(id, meta->data, error_code, error_text);
}
} else {
bthread::PendingError e;
e.id = id;
e.error_code = error_code;
e.error_text = error_text;
e.location = location;
meta->pending_q.push(e);
meta->mutex.unlock();
return 0;
}
}
int bthread_id_list_reset2(bthread_id_list_t* list,
int error_code,
const std::string& error_text) {
if (list->impl != NULL) {
static_cast<bthread::IdList*>(list->impl)->apply(
bthread::IdResetter(error_code, error_text));
}
return 0;
}
int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list,
int error_code,
const std::string& error_text,
pthread_mutex_t* mutex) {
if (mutex == NULL) {
return EINVAL;
}
if (list->impl == NULL) {
return 0;
}
bthread_id_list_t tmplist;
const int rc = bthread_id_list_init(&tmplist, 0, 0);
if (rc != 0) {
return rc;
}
// Swap out the list then reset. The critical section is very small.
pthread_mutex_lock(mutex);
std::swap(list->impl, tmplist.impl);
pthread_mutex_unlock(mutex);
const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
bthread_id_list_destroy(&tmplist);
return rc2;
}
int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list,
int error_code,
const std::string& error_text,
bthread_mutex_t* mutex) {
if (mutex == NULL) {
return EINVAL;
}
if (list->impl == NULL) {
return 0;
}
bthread_id_list_t tmplist;
const int rc = bthread_id_list_init(&tmplist, 0, 0);
if (rc != 0) {
return rc;
}
// Swap out the list then reset. The critical section is very small.
bthread_mutex_lock(mutex);
std::swap(list->impl, tmplist.impl);
bthread_mutex_unlock(mutex);
const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
bthread_id_list_destroy(&tmplist);
return rc2;
}