| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #ifndef KUDU_UTIL_RW_SEMAPHORE_H |
| #define KUDU_UTIL_RW_SEMAPHORE_H |
| |
| // Uncomment for extra debugging information. See below for details. |
| // #define RW_SEMAPHORE_TRACK_HOLDER 1 |
| |
| #include <boost/smart_ptr/detail/yield_k.hpp> |
| #include <glog/logging.h> |
| |
| #include "kudu/gutil/atomicops.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/port.h" |
| #ifdef RW_SEMAPHORE_TRACK_HOLDER |
| #include "kudu/util/debug-util.h" |
| #endif |
| #include "kudu/util/thread.h" |
| |
| namespace kudu { |
| |
| // Read-Write semaphore. 32bit uint that contains the number of readers. |
| // When someone wants to write, tries to set the 32bit, and waits until |
| // the readers have finished. Readers are spinning while the write flag is set. |
| // |
| // This rw-semaphore makes no attempt at fairness, though it does avoid write |
| // starvation (no new readers may obtain the lock if a write is waiting). |
| // |
| // NOTE: this means that it is not safe to reentrantly acquire the read lock, |
| // due to the following deadlock: |
| // - T1: acquire read lock |
| // - T2: wait for write lock |
| // (blocks waiting for readers) |
| // - T1: try to acquire read-lock reentrantly |
| // (blocks to avoid starving writers) |
| // |
| // Given that this is currently based only on spinning (and not futex), |
| // it should only be used in cases where the lock is held for very short |
| // time intervals. |
| // |
| // If the semaphore is expected to always be released from the same thread |
| // that acquired it, use rw_spinlock instead. |
| // |
| // In order to support easier debugging of leaked locks, this class can track |
| // the stack trace of the last thread to lock it in write mode. To do so, |
| // uncomment the definition of RW_SEMAPHORE_TRACK_HOLDER at the top of this |
| // file. Then, in gdb, print the contents of the semaphore, and you should see |
| // the collected stack trace. |
| class rw_semaphore { |
| public: |
| rw_semaphore() : state_(0) { |
| } |
| ~rw_semaphore() {} |
| |
| void lock_shared() { |
| int loop_count = 0; |
| Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); |
| while (true) { |
| Atomic32 expected = cur_state & kNumReadersMask; // I expect no write lock |
| Atomic32 try_new_state = expected + 1; // Add me as reader |
| cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state); |
| if (cur_state == expected) |
| break; |
| // Either was already locked by someone else, or CAS failed. |
| boost::detail::yield(loop_count++); |
| } |
| } |
| |
| void unlock_shared() { |
| int loop_count = 0; |
| Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); |
| while (true) { |
| DCHECK_GT(cur_state & kNumReadersMask, 0) |
| << "unlock_shared() called when there are no shared locks held"; |
| Atomic32 expected = cur_state; // I expect a write lock and other readers |
| Atomic32 try_new_state = expected - 1; // Drop me as reader |
| cur_state = base::subtle::Release_CompareAndSwap(&state_, expected, try_new_state); |
| if (cur_state == expected) |
| break; |
| // Either was already locked by someone else, or CAS failed. |
| boost::detail::yield(loop_count++); |
| } |
| } |
| |
| // Tries to acquire a write lock, if no one else has it. |
| // This function retries on CAS failure and waits for readers to complete. |
| bool try_lock() { |
| int loop_count = 0; |
| Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); |
| while (true) { |
| // someone else has already the write lock |
| if (cur_state & kWriteFlag) |
| return false; |
| |
| Atomic32 expected = cur_state & kNumReadersMask; // I expect some 0+ readers |
| Atomic32 try_new_state = kWriteFlag | expected; // I want to lock the other writers |
| cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state); |
| if (cur_state == expected) |
| break; |
| // Either was already locked by someone else, or CAS failed. |
| boost::detail::yield(loop_count++); |
| } |
| |
| WaitPendingReaders(); |
| RecordLockHolderStack(); |
| return true; |
| } |
| |
| void lock() { |
| int loop_count = 0; |
| Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); |
| while (true) { |
| Atomic32 expected = cur_state & kNumReadersMask; // I expect some 0+ readers |
| Atomic32 try_new_state = kWriteFlag | expected; // I want to lock the other writers |
| // Note: we use NoBarrier here because we'll do the Acquire barrier down below |
| // in WaitPendingReaders |
| cur_state = base::subtle::NoBarrier_CompareAndSwap(&state_, expected, try_new_state); |
| if (cur_state == expected) |
| break; |
| // Either was already locked by someone else, or CAS failed. |
| boost::detail::yield(loop_count++); |
| } |
| |
| WaitPendingReaders(); |
| |
| #ifndef NDEBUG |
| writer_tid_ = Thread::CurrentThreadId(); |
| #endif // NDEBUG |
| RecordLockHolderStack(); |
| } |
| |
| void unlock() { |
| // I expect to be the only writer |
| DCHECK_EQ(base::subtle::NoBarrier_Load(&state_), kWriteFlag); |
| |
| #ifndef NDEBUG |
| writer_tid_ = -1; // Invalid tid. |
| #endif // NDEBUG |
| |
| ResetLockHolderStack(); |
| // Reset: no writers & no readers. |
| Release_Store(&state_, 0); |
| } |
| |
| // Return true if the lock is currently held for write by any thread. |
| // See simple_semaphore::is_locked() for details about where this is useful. |
| bool is_write_locked() const { |
| return base::subtle::NoBarrier_Load(&state_) & kWriteFlag; |
| } |
| |
| // Return true if the lock is currently held, either for read or write |
| // by any thread. |
| // See simple_semaphore::is_locked() for details about where this is useful. |
| bool is_locked() const { |
| return base::subtle::NoBarrier_Load(&state_); |
| } |
| |
| private: |
| static const uint32_t kNumReadersMask = 0x7fffffff; |
| static const uint32_t kWriteFlag = 1 << 31; |
| |
| #ifdef RW_SEMAPHORE_TRACK_HOLDER |
| StackTrace writer_stack_; |
| void RecordLockHolderStack() { |
| writer_stack_.Collect(); |
| } |
| void ResetLockHolderStack() { |
| writer_stack_.Reset(); |
| } |
| #else |
| void RecordLockHolderStack() { |
| } |
| void ResetLockHolderStack() { |
| } |
| #endif |
| |
| void WaitPendingReaders() { |
| int loop_count = 0; |
| while ((base::subtle::Acquire_Load(&state_) & kNumReadersMask) > 0) { |
| boost::detail::yield(loop_count++); |
| } |
| } |
| |
| private: |
| volatile Atomic32 state_; |
| #ifndef NDEBUG |
| int64_t writer_tid_; |
| #endif // NDEBUG |
| }; |
| |
| } // namespace kudu |
| #endif /* KUDU_UTIL_RW_SEMAPHORE_H */ |