blob: e6356683738278b2d888c2cba3055630e259ea38 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "bvar/collector.h"
#include "bthread/rwlock.h"
#include "bthread/butex.h"
namespace bthread {
// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
// which is a bthread implementation of golang RWMutex.
// The lock can be held by an arbitrary number of readers or a single writer.
// For details, see https://github.com/golang/go/blob/master/src/sync/rwmutex.go
// Define in bthread/mutex.cpp
class ContentionProfiler;
extern ContentionProfiler* g_cp;
extern bvar::CollectorSpeedLimit g_cp_sl;
extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
extern void make_contention_site_invalid(bthread_contention_site_t* cs);
extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns);
// It is enough for readers. If the reader exceeds this value,
// need to use `int64_t' instead of `int'.
const int RWLockMaxReaders = 1 << 30;
// For reading.
static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
->fetch_add(1, butil::memory_order_acquire) + 1;
// Fast path.
if (reader_count >= 0) {
CHECK_LT(reader_count, RWLockMaxReaders);
return 0;
}
// Slow path.
// Don't sample when contention profiler is off.
if (NULL == bthread::g_cp) {
return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
}
// Sample.
const int64_t start_ns = butil::cpuwide_time_ns();
int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
const int64_t end_ns = butil::cpuwide_time_ns();
const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
// Submit `csite' for each reader immediately after
// owning rdlock to avoid the contention of `csite'.
bthread::submit_contention(csite, end_ns);
return rc;
}
static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
return rwlock_rdlock_impl(rwlock, NULL);
}
static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
return rwlock_rdlock_impl(rwlock, abstime);
}
// Returns 0 if the lock was acquired, otherwise errno.
static inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
while (true) {
int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
->load(butil::memory_order_relaxed);
if (reader_count < 0) {
// Failed to acquire the read lock because there is a writer.
return EBUSY;
}
if (((butil::atomic<int>*)&rwlock->reader_count)
->compare_exchange_weak(reader_count, reader_count + 1,
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
return 0;
}
}
}
static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
->fetch_add(-1, butil::memory_order_relaxed) - 1;
// Fast path.
if (reader_count >= 0) {
return 0;
}
// Slow path.
if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == -RWLockMaxReaders)) {
CHECK(false) << "rwlock_unrdlock of unlocked rwlock";
return EINVAL;
}
// A writer is pending.
int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait)
->fetch_add(-1, butil::memory_order_relaxed) - 1;
if (reader_wait != 0) {
return 0;
}
// The last reader unblocks the writer.
if (NULL == bthread::g_cp) {
bthread_sem_post(&rwlock->writer_sema);
return 0;
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
bthread_sem_post(&rwlock->writer_sema);
return 0;
}
// Sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
bthread_sem_post(&rwlock->writer_sema);
const int64_t end_ns = butil::cpuwide_time_ns();
const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
// Submit `csite' for each reader immediately after
// releasing rdlock to avoid the contention of `csite'.
bthread::submit_contention(csite, end_ns);
return 0;
}
#define DO_CSITE_IF_NEED \
do { \
/* Don't sample when contention profiler is off. */ \
if (NULL != bthread::g_cp) { \
/* Ask Collector if this (contended) locking should be sampled. */ \
sampling_range = bvar::is_collectable(&bthread::g_cp_sl); \
start_ns = bvar::is_sampling_range_valid(sampling_range) ? \
butil::cpuwide_time_ns() : -1; \
} else { \
start_ns = -1; \
} \
} while (0)
#define SUBMIT_CSITE_IF_NEED \
do { \
if (ETIMEDOUT == rc && start_ns > 0) { \
/* Failed to lock due to ETIMEDOUT, submit the elapse directly. */ \
const int64_t end_ns = butil::cpuwide_time_ns(); \
const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; \
bthread::submit_contention(csite, end_ns); \
} \
} while (0)
// For writing.
static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
// First, resolve competition with other writers.
int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
// -1: don't sample.
// 0: default value.
// > 0: Start time of sampling.
int64_t start_ns = 0;
if (0 != rc) {
DO_CSITE_IF_NEED;
rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime);
if (0 != rc) {
SUBMIT_CSITE_IF_NEED;
return rc;
}
}
// Announce to readers there is a pending writer.
int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
->fetch_add(-RWLockMaxReaders, butil::memory_order_release);
// Wait for active readers.
if (reader_count != 0 &&
((butil::atomic<int>*)&rwlock->reader_wait)
->fetch_add(reader_count) + reader_count != 0) {
rc = bthread_sem_trywait(&rwlock->writer_sema);
if (0 != rc) {
if (0 == start_ns) {
DO_CSITE_IF_NEED;
}
rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime);
if (0 != rc) {
SUBMIT_CSITE_IF_NEED;
bthread_mutex_unlock(&rwlock->write_queue_mutex);
return rc;
}
}
}
if (start_ns > 0) {
rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
rwlock->writer_csite.sampling_range = sampling_range;
}
rwlock->wlock_flag = true;
return 0;
}
#undef DO_CSITE_IF_NEED
#undef SUBMIT_CSITE_IF_NEED
static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) {
return rwlock_wrlock_impl(rwlock, NULL);
}
static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
return rwlock_wrlock_impl(rwlock, abstime);
}
static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) {
int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
if (0 != rc) {
return rc;
}
int expected = 0;
if (!((butil::atomic<int>*)&rwlock->reader_count)
->compare_exchange_strong(expected, -RWLockMaxReaders,
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
// Failed to acquire the write lock because there are active readers.
bthread_mutex_unlock(&rwlock->write_queue_mutex);
return EBUSY;
}
rwlock->wlock_flag = true;
return 0;
}
static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int reader_count) {
bthread_sem_post_n(&rwlock->reader_sema, reader_count);
// Allow other writers to proceed.
bthread_mutex_unlock(&rwlock->write_queue_mutex);
}
static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
rwlock->wlock_flag = false;
// Announce to readers there is no active writer.
int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)->fetch_add(
RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders;
if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) {
CHECK(false) << "rwlock_unwlock of unlocked rwlock";
return EINVAL;
}
bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite);
if (BAIDU_UNLIKELY(is_valid)) {
bthread_contention_site_t saved_csite = rwlock->writer_csite;
bthread::make_contention_site_invalid(&rwlock->writer_csite);
const int64_t unlock_start_ns = butil::cpuwide_time_ns();
rwlock_unwrlock_slow(rwlock, reader_count);
const int64_t unlock_end_ns = butil::cpuwide_time_ns();
saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
bthread::submit_contention(saved_csite, unlock_end_ns);
} else {
rwlock_unwrlock_slow(rwlock, reader_count);
}
return 0;
}
static inline int rwlock_unlock(bthread_rwlock_t* rwlock) {
if (rwlock->wlock_flag) {
return rwlock_unwrlock(rwlock);
} else {
return rwlock_unrdlock(rwlock);
}
}
} // namespace bthread
__BEGIN_DECLS
int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
const bthread_rwlockattr_t* __restrict) {
int rc = bthread_sem_init(&rwlock->reader_sema, 0);
if (BAIDU_UNLIKELY(0 != rc)) {
return rc;
}
bthread_sem_disable_csite(&rwlock->reader_sema);
rc = bthread_sem_init(&rwlock->writer_sema, 0);
if (BAIDU_UNLIKELY(0 != rc)) {
bthread_sem_destroy(&rwlock->reader_sema);
return rc;
}
bthread_sem_disable_csite(&rwlock->writer_sema);
rwlock->reader_count = 0;
rwlock->reader_wait = 0;
rwlock->wlock_flag = false;
bthread_mutexattr_t attr;
bthread_mutexattr_init(&attr);
bthread_mutexattr_disable_csite(&attr);
rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
if (BAIDU_UNLIKELY(0 != rc)) {
bthread_sem_destroy(&rwlock->reader_sema);
bthread_sem_destroy(&rwlock->writer_sema);
return rc;
}
bthread_mutexattr_destroy(&attr);
bthread::make_contention_site_invalid(&rwlock->writer_csite);
return 0;
}
int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
bthread_sem_destroy(&rwlock->reader_sema);
bthread_sem_destroy(&rwlock->writer_sema);
bthread_mutex_destroy(&rwlock->write_queue_mutex);
return 0;
}
int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) {
return bthread::rwlock_rdlock(rwlock);
}
int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
return bthread::rwlock_tryrdlock(rwlock);
}
int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
return bthread::rwlock_timedrdlock(rwlock, abstime);
}
int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) {
return bthread::rwlock_wrlock(rwlock);
}
int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) {
return bthread::rwlock_trywrlock(rwlock);
}
int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime) {
return bthread::rwlock_timedwrlock(rwlock, abstime);
}
int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) {
return bthread::rwlock_unlock(rwlock);
}
__END_DECLS