| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // bthread - An M:N threading library to make applications more concurrent. |
| |
| // Date: Sun Aug 3 12:46:15 CST 2014 |
| |
| #include <deque> |
| #include "butil/logging.h" |
| #include "bthread/butex.h" // butex_* |
| #include "bthread/mutex.h" |
| #include "bthread/list_of_abafree_id.h" |
| #include "butil/resource_pool.h" |
| #include "bthread/bthread.h" |
| |
| namespace bthread { |
| |
| // This queue reduces the chance to allocate memory for deque |
| template <typename T, int N> |
| class SmallQueue { |
| public: |
| SmallQueue() : _begin(0), _size(0), _full(NULL) {} |
| |
| void push(const T& val) { |
| if (_full != NULL && !_full->empty()) { |
| _full->push_back(val); |
| } else if (_size < N) { |
| int tail = _begin + _size; |
| if (tail >= N) { |
| tail -= N; |
| } |
| _c[tail] = val; |
| ++_size; |
| } else { |
| if (_full == NULL) { |
| _full = new std::deque<T>; |
| } |
| _full->push_back(val); |
| } |
| } |
| bool pop(T* val) { |
| if (_size > 0) { |
| *val = _c[_begin]; |
| ++_begin; |
| if (_begin >= N) { |
| _begin -= N; |
| } |
| --_size; |
| return true; |
| } else if (_full && !_full->empty()) { |
| *val = _full->front(); |
| _full->pop_front(); |
| return true; |
| } |
| return false; |
| } |
| bool empty() const { |
| return _size == 0 && (_full == NULL || _full->empty()); |
| } |
| |
| size_t size() const { |
| return _size + (_full ? _full->size() : 0); |
| } |
| |
| void clear() { |
| _size = 0; |
| _begin = 0; |
| if (_full) { |
| _full->clear(); |
| } |
| } |
| |
| ~SmallQueue() { |
| delete _full; |
| _full = NULL; |
| } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(SmallQueue); |
| |
| int _begin; |
| int _size; |
| T _c[N]; |
| std::deque<T>* _full; |
| }; |
| |
| struct PendingError { |
| bthread_id_t id; |
| int error_code; |
| std::string error_text; |
| const char *location; |
| |
| PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {} |
| }; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT Id { |
| // first_ver ~ locked_ver - 1: unlocked versions |
| // locked_ver: locked |
| // unlockable_ver: locked and about to be destroyed |
| // contended_ver: locked and contended |
| uint32_t first_ver; |
| uint32_t locked_ver; |
| FastPthreadMutex mutex; |
| void* data; |
| int (*on_error)(bthread_id_t, void*, int); |
| int (*on_error2)(bthread_id_t, void*, int, const std::string&); |
| const char *lock_location; |
| uint32_t* butex; |
| uint32_t* join_butex; |
| SmallQueue<PendingError, 2> pending_q; |
| |
| Id() { |
| // Although value of the butex(as version part of bthread_id_t) |
| // does not matter, we set it to 0 to make program more deterministic. |
| butex = bthread::butex_create_checked<uint32_t>(); |
| join_butex = bthread::butex_create_checked<uint32_t>(); |
| *butex = 0; |
| *join_butex = 0; |
| } |
| |
| ~Id() { |
| bthread::butex_destroy(butex); |
| bthread::butex_destroy(join_butex); |
| } |
| |
| inline bool has_version(uint32_t id_ver) const { |
| return id_ver >= first_ver && id_ver < locked_ver; |
| } |
| inline uint32_t contended_ver() const { return locked_ver + 1; } |
| inline uint32_t unlockable_ver() const { return locked_ver + 2; } |
| inline uint32_t last_ver() const { return unlockable_ver(); } |
| |
| // also the next "first_ver" |
| inline uint32_t end_ver() const { return last_ver() + 1; } |
| }; |
| |
| BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align); |
| |
| typedef butil::ResourceId<Id> IdResourceId; |
| |
| inline bthread_id_t make_id(uint32_t version, IdResourceId slot) { |
| const bthread_id_t tmp = |
| { (((uint64_t)slot.value) << 32) | (uint64_t)version }; |
| return tmp; |
| } |
| |
| inline IdResourceId get_slot(bthread_id_t id) { |
| const IdResourceId tmp = { (id.value >> 32) }; |
| return tmp; |
| } |
| |
| inline uint32_t get_version(bthread_id_t id) { |
| return (uint32_t)(id.value & 0xFFFFFFFFul); |
| } |
| |
| inline bool id_exists_with_true_negatives(bthread_id_t id) { |
| Id* const meta = address_resource(get_slot(id)); |
| if (meta == NULL) { |
| return false; |
| } |
| const uint32_t id_ver = bthread::get_version(id); |
| return id_ver >= meta->first_ver && id_ver <= meta->last_ver(); |
| } |
| // required by unittest |
| uint32_t id_value(bthread_id_t id) { |
| Id* const meta = address_resource(get_slot(id)); |
| if (meta != NULL) { |
| return *meta->butex; |
| } |
| return 0; // valid version never be zero |
| } |
| |
| static int default_bthread_id_on_error(bthread_id_t id, void*, int) { |
| return bthread_id_unlock_and_destroy(id); |
| } |
| static int default_bthread_id_on_error2( |
| bthread_id_t id, void*, int, const std::string&) { |
| return bthread_id_unlock_and_destroy(id); |
| } |
| |
| void id_status(bthread_id_t id, std::ostream &os) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| os << "Invalid id=" << id.value << '\n'; |
| return; |
| } |
| const uint32_t id_ver = bthread::get_version(id); |
| uint32_t* butex = meta->butex; |
| bool valid = true; |
| void* data = NULL; |
| int (*on_error)(bthread_id_t, void*, int) = NULL; |
| int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL; |
| uint32_t first_ver = 0; |
| uint32_t locked_ver = 0; |
| uint32_t unlockable_ver = 0; |
| uint32_t contended_ver = 0; |
| const char *lock_location = NULL; |
| SmallQueue<PendingError, 2> pending_q; |
| uint32_t butex_value = 0; |
| |
| meta->mutex.lock(); |
| if (meta->has_version(id_ver)) { |
| data = meta->data; |
| on_error = meta->on_error; |
| on_error2 = meta->on_error2; |
| first_ver = meta->first_ver; |
| locked_ver = meta->locked_ver; |
| unlockable_ver = meta->unlockable_ver(); |
| contended_ver = meta->contended_ver(); |
| lock_location = meta->lock_location; |
| const size_t size = meta->pending_q.size(); |
| for (size_t i = 0; i < size; ++i) { |
| PendingError front; |
| meta->pending_q.pop(&front); |
| meta->pending_q.push(front); |
| pending_q.push(front); |
| } |
| butex_value = *butex; |
| } else { |
| valid = false; |
| } |
| meta->mutex.unlock(); |
| |
| if (valid) { |
| os << "First id: " |
| << bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n' |
| << "Range: " << locked_ver - first_ver << '\n' |
| << "Status: "; |
| if (butex_value != first_ver) { |
| os << "LOCKED at " << lock_location; |
| if (butex_value == contended_ver) { |
| os << " (CONTENDED)"; |
| } else if (butex_value == unlockable_ver) { |
| os << " (ABOUT TO DESTROY)"; |
| } else { |
| os << " (UNCONTENDED)"; |
| } |
| } else { |
| os << "UNLOCKED"; |
| } |
| os << "\nPendingQ:"; |
| if (pending_q.empty()) { |
| os << " EMPTY"; |
| } else { |
| const size_t size = pending_q.size(); |
| for (size_t i = 0; i < size; ++i) { |
| PendingError front; |
| pending_q.pop(&front); |
| os << " (" << front.location << "/E" << front.error_code |
| << '/' << front.error_text << ')'; |
| } |
| } |
| if (on_error) { |
| if (on_error == default_bthread_id_on_error) { |
| os << "\nOnError: unlock_and_destroy"; |
| } else { |
| os << "\nOnError: " << (void*)on_error; |
| } |
| } else { |
| if (on_error2 == default_bthread_id_on_error2) { |
| os << "\nOnError2: unlock_and_destroy"; |
| } else { |
| os << "\nOnError2: " << (void*)on_error2; |
| } |
| } |
| os << "\nData: " << data; |
| } else { |
| os << "Invalid id=" << id.value; |
| } |
| os << '\n'; |
| } |
| |
| void id_pool_status(std::ostream &os) { |
| os << butil::describe_resources<Id>() << '\n'; |
| } |
| |
| struct IdTraits { |
| static const size_t BLOCK_SIZE = 63; |
| static const size_t MAX_ENTRIES = 100000; |
| static const size_t INIT_GC_SIZE = 4096; |
| static const bthread_id_t ID_INIT; |
| static bool exists(bthread_id_t id) |
| { return bthread::id_exists_with_true_negatives(id); } |
| }; |
| const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID; |
| |
| typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList; |
| |
| struct IdResetter { |
| explicit IdResetter(int ec, const std::string& et) |
| : _error_code(ec), _error_text(et) {} |
| void operator()(bthread_id_t & id) const { |
| bthread_id_error2_verbose( |
| id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__)); |
| id = INVALID_BTHREAD_ID; |
| } |
| private: |
| int _error_code; |
| const std::string& _error_text; |
| }; |
| |
| size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) { |
| if (list->impl == NULL) { |
| return 0; |
| } |
| return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n); |
| } |
| |
| const int ID_MAX_RANGE = 1024; |
| |
| static int id_create_impl( |
| bthread_id_t* id, void* data, |
| int (*on_error)(bthread_id_t, void*, int), |
| int (*on_error2)(bthread_id_t, void*, int, const std::string&)) { |
| IdResourceId slot; |
| Id* const meta = get_resource(&slot); |
| if (meta) { |
| meta->data = data; |
| meta->on_error = on_error; |
| meta->on_error2 = on_error2; |
| CHECK(meta->pending_q.empty()); |
| uint32_t* butex = meta->butex; |
| if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) { |
| // Skip 0 so that bthread_id_t is never 0 |
| // avoid overflow to make comparisons simpler. |
| *butex = 1; |
| } |
| *meta->join_butex = *butex; |
| meta->first_ver = *butex; |
| meta->locked_ver = *butex + 1; |
| *id = make_id(*butex, slot); |
| return 0; |
| } |
| return ENOMEM; |
| } |
| |
| static int id_create_ranged_impl( |
| bthread_id_t* id, void* data, |
| int (*on_error)(bthread_id_t, void*, int), |
| int (*on_error2)(bthread_id_t, void*, int, const std::string&), |
| int range) { |
| if (range < 1 || range > ID_MAX_RANGE) { |
| LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range; |
| LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is " |
| << ID_MAX_RANGE << ", actually " << range; |
| return EINVAL; |
| } |
| IdResourceId slot; |
| Id* const meta = get_resource(&slot); |
| if (meta) { |
| meta->data = data; |
| meta->on_error = on_error; |
| meta->on_error2 = on_error2; |
| CHECK(meta->pending_q.empty()); |
| uint32_t* butex = meta->butex; |
| if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) { |
| // Skip 0 so that bthread_id_t is never 0 |
| // avoid overflow to make comparisons simpler. |
| *butex = 1; |
| } |
| *meta->join_butex = *butex; |
| meta->first_ver = *butex; |
| meta->locked_ver = *butex + range; |
| *id = make_id(*butex, slot); |
| return 0; |
| } |
| return ENOMEM; |
| } |
| |
| } // namespace bthread |
| |
| extern "C" { |
| |
| int bthread_id_create( |
| bthread_id_t* id, void* data, |
| int (*on_error)(bthread_id_t, void*, int)) { |
| return bthread::id_create_impl( |
| id, data, |
| (on_error ? on_error : bthread::default_bthread_id_on_error), NULL); |
| } |
| |
| int bthread_id_create_ranged(bthread_id_t* id, void* data, |
| int (*on_error)(bthread_id_t, void*, int), |
| int range) { |
| return bthread::id_create_ranged_impl( |
| id, data, |
| (on_error ? on_error : bthread::default_bthread_id_on_error), |
| NULL, range); |
| } |
| |
| int bthread_id_lock_and_reset_range_verbose( |
| bthread_id_t id, void **pdata, int range, const char *location) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| const uint32_t id_ver = bthread::get_version(id); |
| uint32_t* butex = meta->butex; |
| bool ever_contended = false; |
| meta->mutex.lock(); |
| while (meta->has_version(id_ver)) { |
| if (*butex == meta->first_ver) { |
| // contended locker always wakes up the butex at unlock. |
| meta->lock_location = location; |
| if (range == 0) { |
| // fast path |
| } else if (range < 0 || |
| range > bthread::ID_MAX_RANGE || |
| range + meta->first_ver <= meta->locked_ver) { |
| LOG_IF(FATAL, range < 0) << "range must be positive, actually " |
| << range; |
| LOG_IF(FATAL, range > bthread::ID_MAX_RANGE) |
| << "max range is " << bthread::ID_MAX_RANGE |
| << ", actually " << range; |
| } else { |
| meta->locked_ver = meta->first_ver + range; |
| } |
| *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver); |
| meta->mutex.unlock(); |
| if (pdata) { |
| *pdata = meta->data; |
| } |
| return 0; |
| } else if (*butex != meta->unlockable_ver()) { |
| *butex = meta->contended_ver(); |
| uint32_t expected_ver = *butex; |
| meta->mutex.unlock(); |
| ever_contended = true; |
| if (bthread::butex_wait(butex, expected_ver, NULL) < 0 && |
| errno != EWOULDBLOCK && errno != EINTR) { |
| return errno; |
| } |
| meta->mutex.lock(); |
| } else { // bthread_id_about_to_destroy was called. |
| meta->mutex.unlock(); |
| return EPERM; |
| } |
| } |
| meta->mutex.unlock(); |
| return EINVAL; |
| } |
| |
| int bthread_id_error_verbose(bthread_id_t id, int error_code, |
| const char *location) { |
| return bthread_id_error2_verbose(id, error_code, std::string(), location); |
| } |
| |
| int bthread_id_about_to_destroy(bthread_id_t id) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| const uint32_t id_ver = bthread::get_version(id); |
| uint32_t* butex = meta->butex; |
| meta->mutex.lock(); |
| if (!meta->has_version(id_ver)) { |
| meta->mutex.unlock(); |
| return EINVAL; |
| } |
| if (*butex == meta->first_ver) { |
| meta->mutex.unlock(); |
| LOG(FATAL) << "bthread_id=" << id.value << " is not locked!"; |
| return EPERM; |
| } |
| const bool contended = (*butex == meta->contended_ver()); |
| *butex = meta->unlockable_ver(); |
| meta->mutex.unlock(); |
| if (contended) { |
| // wake up all waiting lockers. |
| bthread::butex_wake_except(butex, 0); |
| } |
| return 0; |
| } |
| |
| int bthread_id_cancel(bthread_id_t id) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| uint32_t* butex = meta->butex; |
| const uint32_t id_ver = bthread::get_version(id); |
| meta->mutex.lock(); |
| if (!meta->has_version(id_ver)) { |
| meta->mutex.unlock(); |
| return EINVAL; |
| } |
| if (*butex != meta->first_ver) { |
| meta->mutex.unlock(); |
| return EPERM; |
| } |
| *butex = meta->end_ver(); |
| meta->first_ver = *butex; |
| meta->locked_ver = *butex; |
| meta->mutex.unlock(); |
| return_resource(bthread::get_slot(id)); |
| return 0; |
| } |
| |
| int bthread_id_join(bthread_id_t id) { |
| const bthread::IdResourceId slot = bthread::get_slot(id); |
| bthread::Id* const meta = address_resource(slot); |
| if (!meta) { |
| // The id is not created yet, this join is definitely wrong. |
| return EINVAL; |
| } |
| const uint32_t id_ver = bthread::get_version(id); |
| uint32_t* join_butex = meta->join_butex; |
| while (1) { |
| meta->mutex.lock(); |
| const bool has_ver = meta->has_version(id_ver); |
| const uint32_t expected_ver = *join_butex; |
| meta->mutex.unlock(); |
| if (!has_ver) { |
| break; |
| } |
| if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 && |
| errno != EWOULDBLOCK && errno != EINTR) { |
| return errno; |
| } |
| } |
| return 0; |
| } |
| |
| int bthread_id_trylock(bthread_id_t id, void** pdata) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| uint32_t* butex = meta->butex; |
| const uint32_t id_ver = bthread::get_version(id); |
| meta->mutex.lock(); |
| if (!meta->has_version(id_ver)) { |
| meta->mutex.unlock(); |
| return EINVAL; |
| } |
| if (*butex != meta->first_ver) { |
| meta->mutex.unlock(); |
| return EBUSY; |
| } |
| *butex = meta->locked_ver; |
| meta->mutex.unlock(); |
| if (pdata != NULL) { |
| *pdata = meta->data; |
| } |
| return 0; |
| } |
| |
| int bthread_id_lock_verbose(bthread_id_t id, void** pdata, |
| const char *location) { |
| return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location); |
| } |
| |
| int bthread_id_unlock(bthread_id_t id) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| uint32_t* butex = meta->butex; |
| // Release fence makes sure all changes made before signal visible to |
| // woken-up waiters. |
| const uint32_t id_ver = bthread::get_version(id); |
| meta->mutex.lock(); |
| if (!meta->has_version(id_ver)) { |
| meta->mutex.unlock(); |
| LOG(FATAL) << "Invalid bthread_id=" << id.value; |
| return EINVAL; |
| } |
| if (*butex == meta->first_ver) { |
| meta->mutex.unlock(); |
| LOG(FATAL) << "bthread_id=" << id.value << " is not locked!"; |
| return EPERM; |
| } |
| bthread::PendingError front; |
| if (meta->pending_q.pop(&front)) { |
| meta->lock_location = front.location; |
| meta->mutex.unlock(); |
| if (meta->on_error) { |
| return meta->on_error(front.id, meta->data, front.error_code); |
| } else { |
| return meta->on_error2(front.id, meta->data, front.error_code, |
| front.error_text); |
| } |
| } else { |
| const bool contended = (*butex == meta->contended_ver()); |
| *butex = meta->first_ver; |
| meta->mutex.unlock(); |
| if (contended) { |
| // We may wake up already-reused id, but that's OK. |
| bthread::butex_wake(butex); |
| } |
| return 0; |
| } |
| } |
| |
| int bthread_id_unlock_and_destroy(bthread_id_t id) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| uint32_t* butex = meta->butex; |
| uint32_t* join_butex = meta->join_butex; |
| const uint32_t id_ver = bthread::get_version(id); |
| meta->mutex.lock(); |
| if (!meta->has_version(id_ver)) { |
| meta->mutex.unlock(); |
| LOG(FATAL) << "Invalid bthread_id=" << id.value; |
| return EINVAL; |
| } |
| if (*butex == meta->first_ver) { |
| meta->mutex.unlock(); |
| LOG(FATAL) << "bthread_id=" << id.value << " is not locked!"; |
| return EPERM; |
| } |
| const uint32_t next_ver = meta->end_ver(); |
| *butex = next_ver; |
| *join_butex = next_ver; |
| meta->first_ver = next_ver; |
| meta->locked_ver = next_ver; |
| meta->pending_q.clear(); |
| meta->mutex.unlock(); |
| // Notice that butex_wake* returns # of woken-up, not successful or not. |
| bthread::butex_wake_except(butex, 0); |
| bthread::butex_wake_all(join_butex); |
| return_resource(bthread::get_slot(id)); |
| return 0; |
| } |
| |
| int bthread_id_list_init(bthread_id_list_t* list, |
| unsigned /*size*/, |
| unsigned /*conflict_size*/) { |
| list->impl = NULL; // create on demand. |
| // Set unused fields to zero as well. |
| list->head = 0; |
| list->size = 0; |
| list->conflict_head = 0; |
| list->conflict_size = 0; |
| return 0; |
| } |
| |
| void bthread_id_list_destroy(bthread_id_list_t* list) { |
| delete static_cast<bthread::IdList*>(list->impl); |
| list->impl = NULL; |
| } |
| |
| int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) { |
| if (list->impl == NULL) { |
| list->impl = new (std::nothrow) bthread::IdList; |
| if (NULL == list->impl) { |
| return ENOMEM; |
| } |
| } |
| return static_cast<bthread::IdList*>(list->impl)->add(id); |
| } |
| |
| int bthread_id_list_reset(bthread_id_list_t* list, int error_code) { |
| return bthread_id_list_reset2(list, error_code, std::string()); |
| } |
| |
| void bthread_id_list_swap(bthread_id_list_t* list1, |
| bthread_id_list_t* list2) { |
| std::swap(list1->impl, list2->impl); |
| } |
| |
| int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code, |
| pthread_mutex_t* mutex) { |
| return bthread_id_list_reset2_pthreadsafe( |
| list, error_code, std::string(), mutex); |
| } |
| |
| int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code, |
| bthread_mutex_t* mutex) { |
| return bthread_id_list_reset2_bthreadsafe( |
| list, error_code, std::string(), mutex); |
| } |
| |
| } // extern "C" |
| |
| int bthread_id_create2( |
| bthread_id_t* id, void* data, |
| int (*on_error)(bthread_id_t, void*, int, const std::string&)) { |
| return bthread::id_create_impl( |
| id, data, NULL, |
| (on_error ? on_error : bthread::default_bthread_id_on_error2)); |
| } |
| |
| int bthread_id_create2_ranged( |
| bthread_id_t* id, void* data, |
| int (*on_error)(bthread_id_t, void*, int, const std::string&), |
| int range) { |
| return bthread::id_create_ranged_impl( |
| id, data, NULL, |
| (on_error ? on_error : bthread::default_bthread_id_on_error2), range); |
| } |
| |
| int bthread_id_error2_verbose(bthread_id_t id, int error_code, |
| const std::string& error_text, |
| const char *location) { |
| bthread::Id* const meta = address_resource(bthread::get_slot(id)); |
| if (!meta) { |
| return EINVAL; |
| } |
| const uint32_t id_ver = bthread::get_version(id); |
| uint32_t* butex = meta->butex; |
| meta->mutex.lock(); |
| if (!meta->has_version(id_ver)) { |
| meta->mutex.unlock(); |
| return EINVAL; |
| } |
| if (*butex == meta->first_ver) { |
| *butex = meta->locked_ver; |
| meta->lock_location = location; |
| meta->mutex.unlock(); |
| if (meta->on_error) { |
| return meta->on_error(id, meta->data, error_code); |
| } else { |
| return meta->on_error2(id, meta->data, error_code, error_text); |
| } |
| } else { |
| bthread::PendingError e; |
| e.id = id; |
| e.error_code = error_code; |
| e.error_text = error_text; |
| e.location = location; |
| meta->pending_q.push(e); |
| meta->mutex.unlock(); |
| return 0; |
| } |
| } |
| |
| int bthread_id_list_reset2(bthread_id_list_t* list, |
| int error_code, |
| const std::string& error_text) { |
| if (list->impl != NULL) { |
| static_cast<bthread::IdList*>(list->impl)->apply( |
| bthread::IdResetter(error_code, error_text)); |
| } |
| return 0; |
| } |
| |
| int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list, |
| int error_code, |
| const std::string& error_text, |
| pthread_mutex_t* mutex) { |
| if (mutex == NULL) { |
| return EINVAL; |
| } |
| if (list->impl == NULL) { |
| return 0; |
| } |
| bthread_id_list_t tmplist; |
| const int rc = bthread_id_list_init(&tmplist, 0, 0); |
| if (rc != 0) { |
| return rc; |
| } |
| // Swap out the list then reset. The critical section is very small. |
| pthread_mutex_lock(mutex); |
| std::swap(list->impl, tmplist.impl); |
| pthread_mutex_unlock(mutex); |
| const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text); |
| bthread_id_list_destroy(&tmplist); |
| return rc2; |
| } |
| |
| int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list, |
| int error_code, |
| const std::string& error_text, |
| bthread_mutex_t* mutex) { |
| if (mutex == NULL) { |
| return EINVAL; |
| } |
| if (list->impl == NULL) { |
| return 0; |
| } |
| bthread_id_list_t tmplist; |
| const int rc = bthread_id_list_init(&tmplist, 0, 0); |
| if (rc != 0) { |
| return rc; |
| } |
| // Swap out the list then reset. The critical section is very small. |
| bthread_mutex_lock(mutex); |
| std::swap(list->impl, tmplist.impl); |
| bthread_mutex_unlock(mutex); |
| const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text); |
| bthread_id_list_destroy(&tmplist); |
| return rc2; |
| } |