blob: 8ba54ac051bdf85e205362d4b34e3d32fd160a73 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Date: Mon Sep 22 22:23:13 CST 2014
#ifndef BUTIL_DOUBLY_BUFFERED_DATA_H
#define BUTIL_DOUBLY_BUFFERED_DATA_H
#include <deque>
#include <vector>
#include <memory>
#include <pthread.h>
#include "butil/scoped_lock.h"
#include "butil/thread_local.h"
#include "butil/logging.h"
#include "butil/macros.h"
#include "butil/type_traits.h"
#include "butil/errno.h"
#include "butil/atomicops.h"
#include "butil/unique_ptr.h"
#include "butil/type_traits.h"
namespace butil {
// This data structure makes Read() almost lock-free by making Modify()
// *much* slower. It's very suitable for implementing LoadBalancers which
// have a lot of concurrent read-only ops from many threads and occasional
// modifications of data. As a side effect, this data structure can store
// a thread-local data for user.
//
// --- `AllowBthreadSuspended=false' ---
// Read(): Begin with a thread-local mutex locked then read the foreground
// instance which will not be changed before the mutex is unlocked. Since the
// mutex is only locked by Modify() with an empty critical section, the
// function is almost lock-free.
//
// Modify(): Modify background instance which is not used by any Read(), flip
// foreground and background, lock thread-local mutexes one by one to make
// sure all existing Read() finish and later Read() see new foreground,
// then modify background(foreground before flip) again.
//
// But, when `AllowBthreadSuspended=false', it is not allowed to suspend bthread
// while reading. Otherwise, it may cause deadlock.
//
//
// --- `AllowBthreadSuspended=true' ---
// It is allowed to suspend bthread while reading.
// It is not allowed to use non-Void TLS.
// If bthread will not be suspended while reading, it also makes Read() almost
// lock-free by making Modify() *much* slower.
// If bthread will be suspended while reading, there is competition among
// bthreads using the same Wrapper.
//
// Read(): Begin with thread-local reference count of foreground instance
// incremented by one which be protected by a thread-local mutex, then read
// the foreground instance which will not be changed before its all thread-local
// reference count become zero. At last, after the query completes, thread-local
// reference count of foreground instance will be decremented by one, and if
// it becomes zero, notifies Modify().
//
// Modify(): Modify background instance which is not used by any Read(), flip
// foreground and background, lock thread-local mutexes one by one and wait
// until thread-local reference counts which be protected by a thread-local
// mutex become 0 to make sure all existing Read() finish and later Read()
// see new foreground, then modify background(foreground before flip) again.
class Void { };
template <typename T> struct IsVoid : false_type { };
template <> struct IsVoid<Void> : true_type { };
template <typename T, typename TLS = Void, bool AllowBthreadSuspended = false>
class DoublyBufferedData {
class Wrapper;
class WrapperTLSGroup;
typedef int WrapperTLSId;
typedef std::shared_ptr<Wrapper> WrapperSharedPtr;
typedef std::weak_ptr<Wrapper> WrapperWeakPtr;
public:
class ScopedPtr {
friend class DoublyBufferedData;
public:
ScopedPtr() : _data(NULL), _index(0), _w(NULL) {}
~ScopedPtr() {
if (_w) {
if (AllowBthreadSuspended) {
_w->EndRead(_index);
} else {
_w->EndRead();
}
}
}
const T* get() const { return _data; }
const T& operator*() const { return *_data; }
const T* operator->() const { return _data; }
TLS& tls() { return _w->user_tls(); }
private:
DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
const T* _data;
// Index of foreground instance used by ScopedPtr.
int _index;
WrapperSharedPtr _w;
};
DoublyBufferedData();
~DoublyBufferedData();
// Put foreground instance into ptr. The instance will not be changed until
// ptr is destructed.
// This function is not blocked by Read() and Modify() in other threads.
// Returns 0 on success, -1 otherwise.
int Read(ScopedPtr* ptr);
// `fn(const T&)' will be called with foreground instance.
// This function is not blocked by Read() and Modify() in other threads.
// Returns 0 on success, otherwise on error.
template<typename Fn>
int Read(Fn&& fn);
// Modify background and foreground instances. fn(T&, ...) will be called
// twice. Modify() from different threads are exclusive from each other.
// NOTE: Call same series of fn to different equivalent instances should
// result in equivalent instances, otherwise foreground and background
// instance will be inconsistent.
template <typename Fn, typename... Args>
size_t Modify(Fn&& fn, Args&&... args);
// fn(T& background, const T& foreground, ...) will be called to background
// and foreground instances respectively.
template <typename Fn, typename... Args>
size_t ModifyWithForeground(Fn&& fn, Args&&... args);
private:
const T* UnsafeRead() const {
return _data + _index.load(butil::memory_order_acquire);
}
const T* UnsafeRead(int& index) const {
index = _index.load(butil::memory_order_acquire);
return _data + index;
}
WrapperSharedPtr GetWrapper();
// Foreground and background void.
T _data[2];
// Index of foreground instance.
butil::atomic<int> _index;
// Key to access thread-local wrappers.
WrapperTLSId _wrapper_key;
// All thread-local instances.
std::vector<WrapperWeakPtr> _wrappers;
// Sequence access to _wrappers.
pthread_mutex_t _wrappers_mutex{};
// Sequence modifications.
pthread_mutex_t _modify_mutex{};
};
template <typename T, typename TLS>
class DoublyBufferedDataWrapperBase {
public:
TLS& user_tls() { return _user_tls; }
protected:
TLS _user_tls;
};
template <typename T>
class DoublyBufferedDataWrapperBase<T, Void> {
};
// Use pthread_key store data limits by _SC_THREAD_KEYS_MAX.
// WrapperTLSGroup can store Wrapper in thread local storage.
// WrapperTLSGroup will destruct Wrapper data when thread exits,
// other times only reset Wrapper inner structure.
template <typename T, typename TLS, bool AllowBthreadSuspended>
class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup {
public:
const static size_t RAW_BLOCK_SIZE = 4096;
const static size_t ELEMENTS_PER_BLOCK =
RAW_BLOCK_SIZE / sizeof(WrapperSharedPtr) > 0 ?
RAW_BLOCK_SIZE / sizeof(WrapperSharedPtr) : 1;
struct BAIDU_CACHELINE_ALIGNMENT ThreadBlock {
WrapperSharedPtr at(size_t offset) {
if (NULL == _data[offset]) {
_data[offset] = std::make_shared<Wrapper>();
}
return _data[offset];
};
private:
WrapperSharedPtr _data[ELEMENTS_PER_BLOCK];
};
static WrapperTLSId key_create() {
BAIDU_SCOPED_LOCK(_s_mutex);
WrapperTLSId id = 0;
if (!_get_free_ids().empty()) {
id = _get_free_ids().back();
_get_free_ids().pop_back();
} else {
id = _s_id++;
}
return id;
}
static int key_delete(WrapperTLSId id) {
BAIDU_SCOPED_LOCK(_s_mutex);
if (id < 0 || id >= _s_id) {
errno = EINVAL;
return -1;
}
_get_free_ids().push_back(id);
return 0;
}
static WrapperSharedPtr get_or_create_tls_data(WrapperTLSId id) {
if (BAIDU_UNLIKELY(id < 0)) {
CHECK(false) << "Invalid id=" << id;
return NULL;
}
if (_s_tls_blocks == NULL) {
_s_tls_blocks = new std::vector<ThreadBlock*>;
butil::thread_atexit(_destroy_tls_blocks);
}
const size_t block_id = (size_t)id / ELEMENTS_PER_BLOCK;
if (block_id >= _s_tls_blocks->size()) {
// The 32ul avoid pointless small resizes.
_s_tls_blocks->resize(std::max(block_id + 1, 32ul));
}
ThreadBlock* tb = (*_s_tls_blocks)[block_id];
if (tb == NULL) {
tb = new ThreadBlock;
(*_s_tls_blocks)[block_id] = tb;
}
return tb->at(id - block_id * ELEMENTS_PER_BLOCK);
}
private:
static void _destroy_tls_blocks() {
if (!_s_tls_blocks) {
return;
}
for (size_t i = 0; i < _s_tls_blocks->size(); ++i) {
delete (*_s_tls_blocks)[i];
}
delete _s_tls_blocks;
_s_tls_blocks = NULL;
}
inline static std::deque<WrapperTLSId>& _get_free_ids() {
if (BAIDU_UNLIKELY(!_s_free_ids)) {
_s_free_ids = new (std::nothrow) std::deque<WrapperTLSId>();
RELEASE_ASSERT(_s_free_ids);
}
return *_s_free_ids;
}
private:
static pthread_mutex_t _s_mutex;
static WrapperTLSId _s_id;
static std::deque<WrapperTLSId>* _s_free_ids;
static __thread std::vector<ThreadBlock*>* _s_tls_blocks;
};
template <typename T, typename TLS, bool AllowBthreadSuspended>
pthread_mutex_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup::_s_mutex = PTHREAD_MUTEX_INITIALIZER;
template <typename T, typename TLS, bool AllowBthreadSuspended>
std::deque<typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSId>*
DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup::_s_free_ids = NULL;
template <typename T, typename TLS, bool AllowBthreadSuspended>
typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSId
DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup::_s_id = 0;
template <typename T, typename TLS, bool AllowBthreadSuspended>
__thread std::vector<typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup::ThreadBlock*>*
DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup::_s_tls_blocks = NULL;
template <typename T, typename TLS, bool AllowBthreadSuspended>
class BAIDU_CACHELINE_ALIGNMENT DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper
: public DoublyBufferedDataWrapperBase<T, TLS> {
friend class DoublyBufferedData;
public:
explicit Wrapper()
: _control(NULL)
, _modify_wait(false) {
pthread_mutex_init(&_mutex, NULL);
if (AllowBthreadSuspended) {
pthread_cond_init(&_cond[0], NULL);
pthread_cond_init(&_cond[1], NULL);
}
}
~Wrapper() {
if (AllowBthreadSuspended) {
WaitReadDone(0);
WaitReadDone(1);
pthread_cond_destroy(&_cond[0]);
pthread_cond_destroy(&_cond[1]);
}
pthread_mutex_destroy(&_mutex);
}
// _mutex will be locked by the calling pthread and DoublyBufferedData.
// Most of the time, no modifications are done, so the mutex is
// uncontended and fast.
inline void BeginRead() {
pthread_mutex_lock(&_mutex);
}
// For `AllowBthreadSuspended=true'.
inline void BeginReadRelease() {
pthread_mutex_unlock(&_mutex);
}
inline void EndRead() {
pthread_mutex_unlock(&_mutex);
}
// For `AllowBthreadSuspended=true'.
// Thread-local reference count which be protected by _mutex
// will be decremented by one.
inline void EndRead(int index) {
BAIDU_SCOPED_LOCK(_mutex);
SubRef(index);
SignalReadCond(index);
}
inline void WaitReadDone() {
BAIDU_SCOPED_LOCK(_mutex);
}
// For `AllowBthreadSuspended=true'.
// Wait until all read of foreground instance done.
inline void WaitReadDone(int index) {
BAIDU_SCOPED_LOCK(_mutex);
int& ref = index == 0 ? _ref[0] : _ref[1];
while (ref != 0) {
_modify_wait = true;
pthread_cond_wait(&_cond[index], &_mutex);
}
_modify_wait = false;
}
// For `AllowBthreadSuspended=true'.
inline void SignalReadCond(int index) {
if (_ref[index] == 0 && _modify_wait) {
pthread_cond_signal(&_cond[index]);
}
}
// For `AllowBthreadSuspended=true'.
void AddRef(int index) {
++_ref[index];
}
// For `AllowBthreadSuspended=true'.
void SubRef(int index) {
--_ref[index];
}
private:
DoublyBufferedData* _control;
pthread_mutex_t _mutex{};
// For `AllowBthreadSuspended=true'.
// _cond[0] for _ref[0], _cond[1] for _ref[1]
pthread_cond_t _cond[2]{};
// For `AllowBthreadSuspended=true'.
// _ref[0] is reference count for _data[0],
// _ref[1] is reference count for _data[1].
int _ref[2]{0, 0};
// For `AllowBthreadSuspended=true'.
// Whether there is a Modify() waiting for _ref0/_ref1.
bool _modify_wait;
};
// Called when thread initializes thread-local wrapper.
template <typename T, typename TLS, bool AllowBthreadSuspended>
typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperSharedPtr
DoublyBufferedData<T, TLS, AllowBthreadSuspended>::GetWrapper() {
WrapperSharedPtr w = WrapperTLSGroup::get_or_create_tls_data(_wrapper_key);
if (NULL == w) {
return NULL;
}
if (w->_control == this) {
return w;
}
if (w->_control != NULL) {
LOG(FATAL) << "Get wrapper from tls but control != this";
return NULL;
}
try {
w->_control = this;
BAIDU_SCOPED_LOCK(_wrappers_mutex);
_wrappers.push_back(w);
// The chance to remove expired weak_ptr.
_wrappers.erase(
std::remove_if(_wrappers.begin(), _wrappers.end(),
[](const WrapperWeakPtr& w) {
return w.expired();
}),
_wrappers.end());
} catch (std::exception& e) {
return NULL;
}
return w;
}
template <typename T, typename TLS, bool AllowBthreadSuspended>
DoublyBufferedData<T, TLS, AllowBthreadSuspended>::DoublyBufferedData()
: _index(0)
, _wrapper_key(0) {
BAIDU_CASSERT(!(AllowBthreadSuspended && !IsVoid<TLS>::value),
"Forbidden to allow bthread suspended with non-Void TLS");
_wrappers.reserve(64);
pthread_mutex_init(&_modify_mutex, NULL);
pthread_mutex_init(&_wrappers_mutex, NULL);
_wrapper_key = WrapperTLSGroup::key_create();
// Initialize _data for some POD types. This is essential for pointer
// types because they should be Read() as NULL before any Modify().
if (is_integral<T>::value || is_floating_point<T>::value ||
is_pointer<T>::value || is_member_function_pointer<T>::value) {
_data[0] = T();
_data[1] = T();
}
}
template <typename T, typename TLS, bool AllowBthreadSuspended>
DoublyBufferedData<T, TLS, AllowBthreadSuspended>::~DoublyBufferedData() {
// User is responsible for synchronizations between Read()/Modify() and
// this function.
{
BAIDU_SCOPED_LOCK(_wrappers_mutex);
for (size_t i = 0; i < _wrappers.size(); ++i) {
WrapperSharedPtr w = _wrappers[i].lock();
if (NULL != w) {
w->_control = NULL; // hack: disable removal.
}
}
_wrappers.clear();
}
WrapperTLSGroup::key_delete(_wrapper_key);
_wrapper_key = -1;
pthread_mutex_destroy(&_modify_mutex);
pthread_mutex_destroy(&_wrappers_mutex);
}
template <typename T, typename TLS, bool AllowBthreadSuspended>
int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(
typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ScopedPtr* ptr) {
WrapperSharedPtr w = GetWrapper();
if (BAIDU_UNLIKELY(w == NULL)) {
return -1;
}
if (AllowBthreadSuspended) {
// Use reference count instead of mutex to indicate read of
// foreground instance, so during the read process, there is
// no need to lock mutex and bthread is allowed to be suspended.
w->BeginRead();
// UnsafeRead will update ptr->_index
ptr->_data = UnsafeRead(ptr->_index);
w->AddRef(ptr->_index);
w->BeginReadRelease();
} else {
w->BeginRead();
ptr->_data = UnsafeRead();
}
ptr->_w.swap(w);
return 0;
}
template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(Fn&& fn) {
BAIDU_CASSERT((is_result_void<Fn, const T&>::value),
"Fn must accept `const T&' and return void");
ScopedPtr ptr;
if (Read(&ptr) != 0) {
return -1;
}
fn(*ptr);
return 0;
}
template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename... Args>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn&& fn, Args&&... args) {
// _modify_mutex sequences modifications. Using a separate mutex rather
// than _wrappers_mutex is to avoid blocking threads calling
// GetWrapper() too long. Most of the time, modifications
// are done by one thread, contention should be negligible.
BAIDU_SCOPED_LOCK(_modify_mutex);
int bg_index = !_index.load(butil::memory_order_relaxed);
// background instance is not accessed by other threads, being safe to
// modify.
const size_t ret = fn(_data[bg_index], std::forward<Args>(args)...);
if (!ret) {
return 0;
}
// Publish, flip background and foreground.
// The release fence matches with the acquire fence in UnsafeRead() to
// make readers which just begin to read the new foreground instance see
// all changes made in fn.
_index.store(bg_index, butil::memory_order_release);
bg_index = !bg_index;
// Wait until all threads finishes current reading. When they begin next
// read, they should see updated _index.
{
BAIDU_SCOPED_LOCK(_wrappers_mutex);
// The chance to remove expired weak_ptr.
_wrappers.erase(
std::remove_if(_wrappers.begin(), _wrappers.end(),
[bg_index](const WrapperWeakPtr& weak) {
WrapperSharedPtr w = weak.lock();
bool expired = NULL == w;
if (!expired) {
// Notify all threads waiting for read done.
if (AllowBthreadSuspended) {
w->WaitReadDone(bg_index);
} else {
w->WaitReadDone();
}
}
// Remove expired weak_ptr.
return expired;
}),
_wrappers.end());
}
const size_t ret2 = fn(_data[bg_index], std::forward<Args>(args)...);
CHECK_EQ(ret2, ret) << "index=" << _index.load(butil::memory_order_relaxed);
return ret2;
}
template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename... Args>
size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(Fn&& fn, Args&&... args) {
return Modify([this, &fn](T& bg, Args&&... args) {
return fn(bg, (const T&)_data[&bg == _data], std::forward<Args>(args)...);
}, std::forward<Args>(args)...);
}
} // namespace butil
#endif // BUTIL_DOUBLY_BUFFERED_DATA_H