| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // bthread - An M:N threading library to make applications more concurrent. |
| |
| // Date: Tue Jul 22 17:30:12 CST 2014 |
| |
| #include "butil/atomicops.h" // butil::atomic |
| #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK |
| #include "butil/macros.h" |
| #include "butil/containers/flat_map.h" |
| #include "butil/containers/linked_list.h" // LinkNode |
| #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
| #include "butil/memory/singleton_on_pthread_once.h" |
| #endif |
| #include "butil/logging.h" |
| #include "butil/object_pool.h" |
| #include "bthread/errno.h" // EWOULDBLOCK |
| #include "bthread/sys_futex.h" // futex_* |
| #include "bthread/processor.h" // cpu_relax |
| #include "bthread/task_control.h" // TaskControl |
| #include "bthread/task_group.h" // TaskGroup |
| #include "bthread/timer_thread.h" |
| #include "bthread/butex.h" |
| #include "bthread/mutex.h" |
| |
| // This file implements butex.h |
| // Provides futex-like semantics which is sequenced wait and wake operations |
| // and guaranteed visibilities. |
| // |
| // If wait is sequenced before wake: |
| // [thread1] [thread2] |
| // wait() value = new_value |
| // wake() |
| // wait() sees unmatched value(fail to wait), or wake() sees the waiter. |
| // |
| // If wait is sequenced after wake: |
| // [thread1] [thread2] |
| // value = new_value |
| // wake() |
| // wait() |
| // wake() must provide some sort of memory fence to prevent assignment |
| // of value to be reordered after it. Thus the value is visible to wait() |
| // as well. |
| |
| namespace bthread { |
| |
| #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
| struct ButexWaiterCount : public bvar::Adder<int64_t> { |
| ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {} |
| }; |
| inline bvar::Adder<int64_t>& butex_waiter_count() { |
| return *butil::get_leaky_singleton<ButexWaiterCount>(); |
| } |
| #endif |
| |
| enum WaiterState { |
| WAITER_STATE_NONE, |
| WAITER_STATE_READY, |
| WAITER_STATE_TIMEDOUT, |
| WAITER_STATE_UNMATCHEDVALUE, |
| WAITER_STATE_INTERRUPTED, |
| }; |
| |
| struct Butex; |
| |
| struct ButexWaiter : public butil::LinkNode<ButexWaiter> { |
| // tids of pthreads are 0 |
| bthread_t tid; |
| |
| // Erasing node from middle of LinkedList is thread-unsafe, we need |
| // to hold its container's lock. |
| butil::atomic<Butex*> container; |
| }; |
| |
| // non_pthread_task allocates this structure on stack and queue it in |
| // Butex::waiters. |
| struct ButexBthreadWaiter : public ButexWaiter { |
| TaskMeta* task_meta; |
| TimerThread::TaskId sleep_id; |
| WaiterState waiter_state; |
| int expected_value; |
| Butex* initial_butex; |
| TaskControl* control; |
| const timespec* abstime; |
| bthread_tag_t tag; |
| }; |
| |
| // pthread_task or main_task allocates this structure on stack and queue it |
| // in Butex::waiters. |
| struct ButexPthreadWaiter : public ButexWaiter { |
| butil::atomic<int> sig; |
| }; |
| |
| typedef butil::LinkedList<ButexWaiter> ButexWaiterList; |
| |
| enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED }; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT Butex { |
| Butex() {} |
| ~Butex() {} |
| |
| butil::atomic<int> value; |
| ButexWaiterList waiters; |
| FastPthreadMutex waiter_lock; |
| }; |
| |
| BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0); |
| BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline); |
| |
| } // namespace bthread |
| |
| namespace butil { |
| // Butex object returned to the ObjectPool<Butex> may be accessed, |
| // so ObjectPool<Butex> can not poison the memory region of Butex. |
| template <> |
| struct ObjectPoolWithASanPoison<bthread::Butex> : false_type {}; |
| } // namespace butil |
| |
| namespace bthread { |
| |
| static void wakeup_pthread(ButexPthreadWaiter* pw) { |
| // release fence makes wait_pthread see changes before wakeup. |
| pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release); |
| // At this point, wait_pthread() possibly has woken up and destroyed `pw'. |
| // In which case, futex_wake_private() should return EFAULT. |
| // If crash happens in future, `pw' can be made TLS and never destroyed |
| // to solve the issue. |
| futex_wake_private(&pw->sig, 1); |
| } |
| |
| bool erase_from_butex(ButexWaiter*, bool, WaiterState); |
| |
| int wait_pthread(ButexPthreadWaiter& pw, const timespec* abstime) { |
| timespec* ptimeout = NULL; |
| timespec timeout; |
| int64_t timeout_us = 0; |
| int rc; |
| |
| while (true) { |
| if (abstime != NULL) { |
| timeout_us = butil::timespec_to_microseconds(*abstime) - butil::gettimeofday_us(); |
| timeout = butil::microseconds_to_timespec(timeout_us); |
| ptimeout = &timeout; |
| } |
| if (timeout_us > MIN_SLEEP_US || abstime == NULL) { |
| rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout); |
| if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) { |
| // If `sig' is changed, wakeup_pthread() must be called and `pw' |
| // is already removed from the butex. |
| // Acquire fence makes this thread sees changes before wakeup. |
| return rc; |
| } |
| } else { |
| errno = ETIMEDOUT; |
| rc = -1; |
| } |
| // Handle ETIMEDOUT when abstime is valid. |
| // If futex_wait_private return EINTR, just continue the loop. |
| if (rc != 0 && errno == ETIMEDOUT) { |
| // wait futex timeout, `pw' is still in the queue, remove it. |
| if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) { |
| // Another thread is erasing `pw' as well, wait for the signal. |
| // Acquire fence makes this thread sees changes before wakeup. |
| if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) { |
| // already timedout, abstime and ptimeout are expired. |
| abstime = NULL; |
| ptimeout = NULL; |
| continue; |
| } |
| } |
| return rc; |
| } |
| } |
| } |
| |
| extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; |
| |
| // Returns 0 when no need to unschedule or successfully unscheduled, |
| // -1 otherwise. |
| inline int unsleep_if_necessary(ButexBthreadWaiter* w, |
| TimerThread* timer_thread) { |
| if (!w->sleep_id) { |
| return 0; |
| } |
| if (timer_thread->unschedule(w->sleep_id) > 0) { |
| // the callback is running. |
| return -1; |
| } |
| w->sleep_id = 0; |
| return 0; |
| } |
| |
| // Use ObjectPool(which never frees memory) to solve the race between |
| // butex_wake() and butex_destroy(). The race is as follows: |
| // |
| // class Event { |
| // public: |
| // void wait() { |
| // _mutex.lock(); |
| // if (!_done) { |
| // _cond.wait(&_mutex); |
| // } |
| // _mutex.unlock(); |
| // } |
| // void signal() { |
| // _mutex.lock(); |
| // if (!_done) { |
| // _done = true; |
| // _cond.signal(); |
| // } |
| // _mutex.unlock(); /*1*/ |
| // } |
| // private: |
| // bool _done = false; |
| // Mutex _mutex; |
| // Condition _cond; |
| // }; |
| // |
| // [Thread1] [Thread2] |
| // foo() { |
| // Event event; |
| // pass_to_thread2(&event); ---> event.signal(); |
| // event.wait(); |
| // } <-- event destroyed |
| // |
| // Summary: Thread1 passes a stateful condition to Thread2 and waits until |
| // the condition being signalled, which basically means the associated |
| // job is done and Thread1 can release related resources including the mutex |
| // and condition. The scenario is fine and the code is correct. |
| // The race needs a closer look. The unlock at /*1*/ may have different |
| // implementations, but in which the last step is probably an atomic store |
| // and butex_wake(), like this: |
| // |
| // locked->store(0); |
| // butex_wake(locked); |
| // |
| // The `locked' represents the locking status of the mutex. The issue is that |
| // just after the store(), the mutex is already unlocked and the code in |
| // Event.wait() may successfully grab the lock and go through everything |
| // left and leave foo() function, destroying the mutex and butex, making |
| // the butex_wake(locked) crash. |
| // To solve this issue, one method is to add reference before store and |
| // release the reference after butex_wake. However reference countings need |
| // to be added in nearly every user scenario of butex_wake(), which is very |
| // error-prone. Another method is never freeing butex, with the side effect |
| // that butex_wake() may wake up an unrelated butex(the one reuses the memory) |
| // and cause spurious wakeups. According to our observations, the race is |
| // infrequent, even rare. The extra spurious wakeups should be acceptable. |
| |
| void* butex_create() { |
| Butex* b = butil::get_object<Butex>(); |
| if (b) { |
| return &b->value; |
| } |
| return NULL; |
| } |
| |
| void butex_destroy(void* butex) { |
| if (!butex) { |
| return; |
| } |
| Butex* b = static_cast<Butex*>( |
| container_of(static_cast<butil::atomic<int>*>(butex), Butex, value)); |
| butil::return_object(b); |
| } |
| |
| // if TaskGroup tls_task_group is belong to tag |
| inline bool is_same_tag(bthread_tag_t tag) { |
| return tls_task_group && tls_task_group->tag() == tag; |
| } |
| |
| // nosignal is true & tag is same can return true |
| inline bool check_nosignal(bool nosignal, bthread_tag_t tag) { |
| return nosignal && is_same_tag(tag); |
| } |
| |
| // if tag is same return tls_task_group else choose one group with tag |
| inline TaskGroup* get_task_group(TaskControl* c, bthread_tag_t tag) { |
| return is_same_tag(tag) ? tls_task_group : c->choose_one_group(tag); |
| } |
| |
| inline void run_in_local_task_group(TaskGroup* g, TaskMeta* next_meta, bool nosignal) { |
| if (!nosignal) { |
| TaskGroup::exchange(&g, next_meta); |
| } else { |
| g->ready_to_run(next_meta, nosignal); |
| } |
| } |
| |
| int butex_wake(void* arg, bool nosignal) { |
| Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
| ButexWaiter* front = NULL; |
| { |
| BAIDU_SCOPED_LOCK(b->waiter_lock); |
| if (b->waiters.empty()) { |
| return 0; |
| } |
| front = b->waiters.head()->value(); |
| front->RemoveFromList(); |
| front->container.store(NULL, butil::memory_order_relaxed); |
| } |
| if (front->tid == 0) { |
| wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); |
| return 1; |
| } |
| ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); |
| unsleep_if_necessary(bbw, get_global_timer_thread()); |
| TaskGroup* g = get_task_group(bbw->control, bbw->tag); |
| if (g == tls_task_group) { |
| run_in_local_task_group(g, bbw->task_meta, nosignal); |
| } else { |
| g->ready_to_run_remote(bbw->task_meta, check_nosignal(nosignal, g->tag())); |
| } |
| return 1; |
| } |
| |
| int butex_wake_n(void* arg, size_t n, bool nosignal) { |
| Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
| |
| ButexWaiterList bthread_waiters; |
| ButexWaiterList pthread_waiters; |
| { |
| BAIDU_SCOPED_LOCK(b->waiter_lock); |
| for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) { |
| ButexWaiter* bw = b->waiters.head()->value(); |
| bw->RemoveFromList(); |
| bw->container.store(NULL, butil::memory_order_relaxed); |
| if (bw->tid) { |
| bthread_waiters.Append(bw); |
| } else { |
| pthread_waiters.Append(bw); |
| } |
| } |
| } |
| |
| int nwakeup = 0; |
| while (!pthread_waiters.empty()) { |
| ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>( |
| pthread_waiters.head()->value()); |
| bw->RemoveFromList(); |
| wakeup_pthread(bw); |
| ++nwakeup; |
| } |
| if (bthread_waiters.empty()) { |
| return nwakeup; |
| } |
| butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups; |
| nwakeups.init(FLAGS_task_group_ntags); |
| // We will exchange with first waiter in the end. |
| ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>( |
| bthread_waiters.head()->value()); |
| next->RemoveFromList(); |
| unsleep_if_necessary(next, get_global_timer_thread()); |
| ++nwakeup; |
| while (!bthread_waiters.empty()) { |
| // pop reversely |
| ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>( |
| bthread_waiters.tail()->value()); |
| w->RemoveFromList(); |
| unsleep_if_necessary(w, get_global_timer_thread()); |
| auto g = get_task_group(w->control, w->tag); |
| g->ready_to_run_general(w->task_meta, true); |
| nwakeups[g->tag()] = g; |
| ++nwakeup; |
| } |
| for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) { |
| auto g = it->second; |
| if (!check_nosignal(nosignal, g->tag())) { |
| g->flush_nosignal_tasks_general(); |
| } |
| } |
| auto g = get_task_group(next->control, next->tag); |
| if (g == tls_task_group) { |
| run_in_local_task_group(g, next->task_meta, nosignal); |
| } else { |
| g->ready_to_run_remote(next->task_meta, check_nosignal(nosignal, g->tag())); |
| } |
| return nwakeup; |
| } |
| |
| int butex_wake_all(void* arg, bool nosignal) { |
| return butex_wake_n(arg, 0, nosignal); |
| } |
| |
| int butex_wake_except(void* arg, bthread_t excluded_bthread) { |
| Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
| |
| ButexWaiterList bthread_waiters; |
| ButexWaiterList pthread_waiters; |
| { |
| ButexWaiter* excluded_waiter = NULL; |
| BAIDU_SCOPED_LOCK(b->waiter_lock); |
| while (!b->waiters.empty()) { |
| ButexWaiter* bw = b->waiters.head()->value(); |
| bw->RemoveFromList(); |
| |
| if (bw->tid) { |
| if (bw->tid != excluded_bthread) { |
| bthread_waiters.Append(bw); |
| bw->container.store(NULL, butil::memory_order_relaxed); |
| } else { |
| excluded_waiter = bw; |
| } |
| } else { |
| bw->container.store(NULL, butil::memory_order_relaxed); |
| pthread_waiters.Append(bw); |
| } |
| } |
| |
| if (excluded_waiter) { |
| b->waiters.Append(excluded_waiter); |
| } |
| } |
| |
| int nwakeup = 0; |
| while (!pthread_waiters.empty()) { |
| ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>( |
| pthread_waiters.head()->value()); |
| bw->RemoveFromList(); |
| wakeup_pthread(bw); |
| ++nwakeup; |
| } |
| |
| if (bthread_waiters.empty()) { |
| return nwakeup; |
| } |
| butil::FlatMap<bthread_tag_t, TaskGroup*> nwakeups; |
| nwakeups.init(FLAGS_task_group_ntags); |
| do { |
| // pop reversely |
| ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value()); |
| w->RemoveFromList(); |
| unsleep_if_necessary(w, get_global_timer_thread()); |
| auto g = get_task_group(w->control, w->tag); |
| g->ready_to_run_general(w->task_meta, true); |
| nwakeups[g->tag()] = g; |
| ++nwakeup; |
| } while (!bthread_waiters.empty()); |
| for (auto it = nwakeups.begin(); it != nwakeups.end(); ++it) { |
| auto g = it->second; |
| g->flush_nosignal_tasks_general(); |
| } |
| return nwakeup; |
| } |
| |
| int butex_requeue(void* arg, void* arg2) { |
| Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
| Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value); |
| |
| ButexWaiter* front = NULL; |
| { |
| std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock); |
| std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock); |
| butil::double_lock(lck1, lck2); |
| if (b->waiters.empty()) { |
| return 0; |
| } |
| |
| front = b->waiters.head()->value(); |
| front->RemoveFromList(); |
| front->container.store(NULL, butil::memory_order_relaxed); |
| |
| while (!b->waiters.empty()) { |
| ButexWaiter* bw = b->waiters.head()->value(); |
| bw->RemoveFromList(); |
| m->waiters.Append(bw); |
| bw->container.store(m, butil::memory_order_relaxed); |
| } |
| } |
| |
| if (front->tid == 0) { // which is a pthread |
| wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); |
| return 1; |
| } |
| ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); |
| unsleep_if_necessary(bbw, get_global_timer_thread()); |
| auto g = is_same_tag(bbw->tag) ? tls_task_group : NULL; |
| if (g) { |
| TaskGroup::exchange(&g, bbw->task_meta); |
| } else { |
| bbw->control->choose_one_group(bbw->tag)->ready_to_run_remote(bbw->task_meta); |
| } |
| return 1; |
| } |
| |
| // Callable from multiple threads, at most one thread may wake up the waiter. |
| static void erase_from_butex_and_wakeup(void* arg) { |
| erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT); |
| } |
| |
| // Used in task_group.cpp |
| bool erase_from_butex_because_of_interruption(ButexWaiter* bw) { |
| return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED); |
| } |
| |
| inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) { |
| // `bw' is guaranteed to be valid inside this function because waiter |
| // will wait until this function being cancelled or finished. |
| // NOTE: This function must be no-op when bw->container is NULL. |
| bool erased = false; |
| Butex* b; |
| int saved_errno = errno; |
| while ((b = bw->container.load(butil::memory_order_acquire))) { |
| // b can be NULL when the waiter is scheduled but queued. |
| BAIDU_SCOPED_LOCK(b->waiter_lock); |
| if (b == bw->container.load(butil::memory_order_relaxed)) { |
| bw->RemoveFromList(); |
| bw->container.store(NULL, butil::memory_order_relaxed); |
| if (bw->tid) { |
| static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state; |
| } |
| erased = true; |
| break; |
| } |
| } |
| if (erased && wakeup) { |
| if (bw->tid) { |
| ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw); |
| get_task_group(bbw->control, bbw->tag)->ready_to_run_general(bbw->task_meta); |
| } else { |
| ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw); |
| wakeup_pthread(pw); |
| } |
| } |
| errno = saved_errno; |
| return erased; |
| } |
| |
| struct WaitForButexArgs { |
| ButexBthreadWaiter* bw; |
| bool prepend; |
| }; |
| |
| void wait_for_butex(void* arg) { |
| auto args = static_cast<WaitForButexArgs*>(arg); |
| ButexBthreadWaiter* const bw = args->bw; |
| Butex* const b = bw->initial_butex; |
| // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY |
| // before they're queued, otherwise the waiter is already timedout |
| // and removed by TimerThread, in which case we should stop queueing. |
| // |
| // Visibility of waiter_state: |
| // [bthread] [TimerThread] |
| // waiter_state = TIMED |
| // tt_lock { add task } |
| // tt_lock { get task } |
| // waiter_lock { waiter_state=TIMEDOUT } |
| // waiter_lock { use waiter_state } |
| // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is |
| // sequenced by two locks, both threads are guaranteed to see the correct |
| // value. |
| { |
| BAIDU_SCOPED_LOCK(b->waiter_lock); |
| if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) { |
| bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE; |
| } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ && |
| !bw->task_meta->interrupted) { |
| if (args->prepend) { |
| b->waiters.Prepend(bw); |
| } else { |
| b->waiters.Append(bw); |
| } |
| bw->container.store(b, butil::memory_order_relaxed); |
| #ifdef BRPC_BTHREAD_TRACER |
| bw->control->_task_tracer.set_status(TASK_STATUS_SUSPENDED, bw->task_meta); |
| #endif // BRPC_BTHREAD_TRACER |
| if (bw->abstime != NULL) { |
| bw->sleep_id = get_global_timer_thread()->schedule( |
| erase_from_butex_and_wakeup, bw, *bw->abstime); |
| if (!bw->sleep_id) { // TimerThread stopped. |
| errno = ESTOP; |
| erase_from_butex_and_wakeup(bw); |
| } |
| } |
| return; |
| } |
| } |
| |
| // b->container is NULL which makes erase_from_butex_and_wakeup() and |
| // TaskGroup::interrupt() no-op, there's no race between following code and |
| // the two functions. The on-stack ButexBthreadWaiter is safe to use and |
| // bw->waiter_state will not change again. |
| // unsleep_if_necessary(bw, get_global_timer_thread()); |
| tls_task_group->ready_to_run(bw->task_meta); |
| // FIXME: jump back to original thread is buggy. |
| |
| // // Value unmatched or waiter is already woken up by TimerThread, jump |
| // // back to original bthread. |
| // TaskGroup* g = tls_task_group; |
| // ReadyToRunArgs args = { g->current_tid(), false }; |
| // g->set_remained(TaskGroup::ready_to_run_in_worker, &args); |
| // // 2: Don't run remained because we're already in a remained function |
| // // otherwise stack may overflow. |
| // TaskGroup::sched_to(&g, bw->tid, false/*2*/); |
| } |
| |
| static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, |
| const timespec* abstime, bool prepend) { |
| TaskMeta* task = NULL; |
| ButexPthreadWaiter pw; |
| pw.tid = 0; |
| pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed); |
| int rc = 0; |
| |
| if (g) { |
| task = g->current_task(); |
| task->current_waiter.store(&pw, butil::memory_order_release); |
| } |
| b->waiter_lock.lock(); |
| if (b->value.load(butil::memory_order_relaxed) != expected_value) { |
| b->waiter_lock.unlock(); |
| errno = EWOULDBLOCK; |
| rc = -1; |
| } else if (task != NULL && task->interrupted) { |
| b->waiter_lock.unlock(); |
| // Race with set and may consume multiple interruptions, which are OK. |
| task->interrupted = false; |
| errno = EINTR; |
| rc = -1; |
| } else { |
| if (prepend) { |
| b->waiters.Prepend(&pw); |
| } else { |
| b->waiters.Append(&pw); |
| } |
| pw.container.store(b, butil::memory_order_relaxed); |
| b->waiter_lock.unlock(); |
| |
| #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
| bvar::Adder<int64_t>& num_waiters = butex_waiter_count(); |
| num_waiters << 1; |
| #endif |
| rc = wait_pthread(pw, abstime); |
| #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
| num_waiters << -1; |
| #endif |
| } |
| if (task) { |
| // If current_waiter is NULL, TaskGroup::interrupt() is running and |
| // using pw, spin until current_waiter != NULL. |
| BT_LOOP_WHEN(task->current_waiter.exchange( |
| NULL, butil::memory_order_acquire) == NULL, |
| 30/*nops before sched_yield*/); |
| if (task->interrupted) { |
| task->interrupted = false; |
| if (rc == 0) { |
| errno = EINTR; |
| return -1; |
| } |
| } |
| } |
| return rc; |
| } |
| |
| int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) { |
| Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); |
| if (b->value.load(butil::memory_order_relaxed) != expected_value) { |
| errno = EWOULDBLOCK; |
| // Sometimes we may take actions immediately after unmatched butex, |
| // this fence makes sure that we see changes before changing butex. |
| butil::atomic_thread_fence(butil::memory_order_acquire); |
| return -1; |
| } |
| TaskGroup* g = tls_task_group; |
| if (NULL == g || g->is_current_pthread_task()) { |
| return butex_wait_from_pthread(g, b, expected_value, abstime, prepend); |
| } |
| ButexBthreadWaiter bbw; |
| // tid is 0 iff the thread is non-bthread |
| bbw.tid = g->current_tid(); |
| bbw.container.store(NULL, butil::memory_order_relaxed); |
| bbw.task_meta = g->current_task(); |
| bbw.sleep_id = 0; |
| bbw.waiter_state = WAITER_STATE_READY; |
| bbw.expected_value = expected_value; |
| bbw.initial_butex = b; |
| bbw.control = g->control(); |
| bbw.abstime = abstime; |
| bbw.tag = g->tag(); |
| |
| if (abstime != NULL) { |
| // Schedule timer before queueing. If the timer is triggered before |
| // queueing, cancel queueing. This is a kind of optimistic locking. |
| if (butil::timespec_to_microseconds(*abstime) < |
| (butil::gettimeofday_us() + MIN_SLEEP_US)) { |
| // Already timed out. |
| errno = ETIMEDOUT; |
| return -1; |
| } |
| } |
| #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
| bvar::Adder<int64_t>& num_waiters = butex_waiter_count(); |
| num_waiters << 1; |
| #endif |
| |
| // release fence matches with acquire fence in interrupt_and_consume_waiters |
| // in task_group.cpp to guarantee visibility of `interrupted'. |
| bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release); |
| WaitForButexArgs args{ &bbw, prepend }; |
| g->set_remained(wait_for_butex, &args); |
| TaskGroup::sched(&g); |
| |
| // erase_from_butex_and_wakeup (called by TimerThread) is possibly still |
| // running and using bbw. The chance is small, just spin until it's done. |
| BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, |
| 30/*nops before sched_yield*/); |
| |
| // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw. |
| // Spin until current_waiter != NULL. |
| BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange( |
| NULL, butil::memory_order_acquire) == NULL, |
| 30/*nops before sched_yield*/); |
| #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS |
| num_waiters << -1; |
| #endif |
| |
| bool is_interrupted = false; |
| if (bbw.task_meta->interrupted) { |
| // Race with set and may consume multiple interruptions, which are OK. |
| bbw.task_meta->interrupted = false; |
| is_interrupted = true; |
| } |
| // If timed out as well as value unmatched, return ETIMEDOUT. |
| if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) { |
| errno = ETIMEDOUT; |
| return -1; |
| } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) { |
| errno = EWOULDBLOCK; |
| return -1; |
| } else if (is_interrupted) { |
| errno = EINTR; |
| return -1; |
| } |
| return 0; |
| } |
| |
| } // namespace bthread |
| |
| namespace butil { |
| template <> struct ObjectPoolBlockMaxItem<bthread::Butex> { |
| static const size_t value = 128; |
| }; |
| } |