blob: 17ca63dcfeb2adf35f2cc5773fb97cc952a97e95 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// bthread - An M:N threading library to make applications more concurrent.
// Date: Thu Aug 7 18:56:27 CST 2014
#include "butil/compat.h"
#include <new> // std::nothrow
#include <sys/poll.h> // poll()
#if defined(OS_MACOSX)
#include <sys/types.h> // struct kevent
#include <sys/event.h> // kevent(), kqueue()
#endif
#include "butil/atomicops.h"
#include "butil/time.h"
#include "butil/fd_utility.h" // make_non_blocking
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h" // fmix32
#include "butil/memory/scope_guard.h"
#include "bthread/butex.h" // butex_*
#include "bthread/task_group.h" // TaskGroup
#include "bthread/bthread.h" // bthread_start_urgent
namespace butil {
extern int pthread_fd_wait(int fd, unsigned events, const timespec* abstime);
}
// Implement bthread functions on file descriptors
namespace bthread {
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
template <typename T, size_t NBLOCK, size_t BLOCK_SIZE>
class LazyArray {
struct Block {
butil::atomic<T> items[BLOCK_SIZE];
};
public:
LazyArray() {
memset(static_cast<void*>(_blocks), 0, sizeof(butil::atomic<Block*>) * NBLOCK);
}
butil::atomic<T>* get_or_new(size_t index) {
const size_t block_index = index / BLOCK_SIZE;
if (block_index >= NBLOCK) {
return NULL;
}
const size_t block_offset = index - block_index * BLOCK_SIZE;
Block* b = _blocks[block_index].load(butil::memory_order_consume);
if (b != NULL) {
return b->items + block_offset;
}
b = new (std::nothrow) Block;
if (NULL == b) {
b = _blocks[block_index].load(butil::memory_order_consume);
return (b ? b->items + block_offset : NULL);
}
// Set items to default value of T.
std::fill(b->items, b->items + BLOCK_SIZE, T());
Block* expected = NULL;
if (_blocks[block_index].compare_exchange_strong(
expected, b, butil::memory_order_release,
butil::memory_order_consume)) {
return b->items + block_offset;
}
delete b;
return expected->items + block_offset;
}
butil::atomic<T>* get(size_t index) const {
const size_t block_index = index / BLOCK_SIZE;
if (__builtin_expect(block_index < NBLOCK, 1)) {
const size_t block_offset = index - block_index * BLOCK_SIZE;
Block* const b = _blocks[block_index].load(butil::memory_order_consume);
if (__builtin_expect(b != NULL, 1)) {
return b->items + block_offset;
}
}
return NULL;
}
private:
butil::atomic<Block*> _blocks[NBLOCK];
};
typedef butil::atomic<int> EpollButex;
static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L;
#ifndef NDEBUG
butil::static_atomic<int> break_nums = BUTIL_STATIC_ATOMIC_INIT(0);
#endif
// Able to address 67108864 file descriptors, should be enough.
LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes;
static const int BTHREAD_DEFAULT_EPOLL_SIZE = 65536;
class EpollThread {
public:
EpollThread()
: _epfd(-1)
, _stop(false)
, _tid(0) {
}
int start(int epoll_size) {
if (started()) {
return -1;
}
_start_mutex.lock();
// Double check
if (started()) {
_start_mutex.unlock();
return -1;
}
#if defined(OS_LINUX)
_epfd = epoll_create(epoll_size);
#elif defined(OS_MACOSX)
_epfd = kqueue();
#endif
_start_mutex.unlock();
if (_epfd < 0) {
PLOG(FATAL) << "Fail to epoll_create/kqueue";
return -1;
}
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "EpollThread::run_this");
if (bthread_start_background(
&_tid, &attr, EpollThread::run_this, this) != 0) {
close(_epfd);
_epfd = -1;
LOG(FATAL) << "Fail to create epoll bthread";
return -1;
}
return 0;
}
// Note: This function does not wake up suspended fd_wait. This is fine
// since stop_and_join is only called on program's termination
// (g_task_control.stop()), suspended bthreads do not block quit of
// worker pthreads and completion of g_task_control.stop().
int stop_and_join() {
if (!started()) {
return 0;
}
// No matter what this function returns, _epfd will be set to -1
// (making started() false) to avoid latter stop_and_join() to
// enter again.
const int saved_epfd = _epfd;
_epfd = -1;
// epoll_wait cannot be woken up by closing _epfd. We wake up
// epoll_wait by inserting a fd continuously triggering EPOLLOUT.
// Visibility of _stop: constant EPOLLOUT forces epoll_wait to see
// _stop (to be true) finally.
_stop = true;
int closing_epoll_pipe[2];
if (pipe(closing_epoll_pipe)) {
PLOG(FATAL) << "Fail to create closing_epoll_pipe";
return -1;
}
#if defined(OS_LINUX)
epoll_event evt = { EPOLLOUT, { NULL } };
if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
closing_epoll_pipe[1], &evt) < 0) {
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
0, 0, NULL);
if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
#endif
PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
<< saved_epfd;
return -1;
}
const int rc = bthread_join(_tid, NULL);
if (rc) {
LOG(FATAL) << "Fail to join EpollThread, " << berror(rc);
return -1;
}
close(closing_epoll_pipe[0]);
close(closing_epoll_pipe[1]);
close(saved_epfd);
return 0;
}
int fd_wait(int fd, unsigned events, const timespec* abstime) {
butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
if (NULL == p) {
errno = ENOMEM;
return -1;
}
EpollButex* butex = p->load(butil::memory_order_consume);
if (NULL == butex) {
// It is rare to wait on one file descriptor from multiple threads
// simultaneously. Creating singleton by optimistic locking here
// saves mutexes for each butex.
butex = butex_create_checked<EpollButex>();
butex->store(0, butil::memory_order_relaxed);
EpollButex* expected = NULL;
if (!p->compare_exchange_strong(expected, butex,
butil::memory_order_release,
butil::memory_order_consume)) {
butex_destroy(butex);
butex = expected;
}
}
while (butex == CLOSING_GUARD) { // bthread_close() is running.
if (sched_yield() < 0) {
return -1;
}
butex = p->load(butil::memory_order_consume);
}
// Save value of butex before adding to epoll because the butex may
// be changed before butex_wait. No memory fence because EPOLL_CTL_MOD
// and EPOLL_CTL_ADD shall have release fence.
const int expected_val = butex->load(butil::memory_order_relaxed);
#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
epoll_event evt = { events | EPOLLONESHOT, { butex } };
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
errno != EEXIST) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
return -1;
}
}
# else
epoll_event evt;
evt.events = events;
evt.data.fd = fd;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
errno != EEXIST) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
return -1;
}
# endif
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
0, 0, butex);
if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
return -1;
}
#endif
while (butex->load(butil::memory_order_relaxed) == expected_val) {
if (butex_wait(butex, expected_val, abstime) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return -1;
}
}
return 0;
}
int fd_close(int fd) {
if (fd < 0) {
// what close(-1) returns
errno = EBADF;
return -1;
}
butil::atomic<EpollButex*>* pbutex = bthread::fd_butexes.get(fd);
if (NULL == pbutex) {
// Did not call bthread_fd functions, close directly.
return close(fd);
}
EpollButex* butex = pbutex->exchange(
CLOSING_GUARD, butil::memory_order_relaxed);
if (butex == CLOSING_GUARD) {
// concurrent double close detected.
errno = EBADF;
return -1;
}
if (butex != NULL) {
butex->fetch_add(1, butil::memory_order_relaxed);
butex_wake_all(butex);
}
#if defined(OS_LINUX)
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
#endif
const int rc = close(fd);
pbutex->exchange(butex, butil::memory_order_relaxed);
return rc;
}
bool started() const {
return _epfd >= 0;
}
private:
static void* run_this(void* arg) {
return static_cast<EpollThread*>(arg)->run();
}
void* run() {
const int initial_epfd = _epfd;
const size_t MAX_EVENTS = 32;
#if defined(OS_LINUX)
epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
#elif defined(OS_MACOSX)
typedef struct kevent KEVENT;
struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
#endif
if (NULL == e) {
LOG(FATAL) << "Fail to new epoll_event";
return NULL;
}
#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
# endif
#endif
while (!_stop) {
const int epfd = _epfd;
#if defined(OS_LINUX)
const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
#elif defined(OS_MACOSX)
const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
#endif
if (_stop) {
break;
}
if (n < 0) {
if (errno == EINTR) {
#ifndef NDEBUG
break_nums.fetch_add(1, butil::memory_order_relaxed);
int* p = &errno;
const char* b = berror();
const char* b2 = berror(errno);
DLOG(FATAL) << "Fail to epoll epfd=" << epfd << ", "
<< errno << " " << p << " " << b << " " << b2;
#endif
continue;
}
PLOG(INFO) << "Fail to epoll epfd=" << epfd;
break;
}
#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
for (int i = 0; i < n; ++i) {
epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
}
# endif
#endif
for (int i = 0; i < n; ++i) {
#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
# else
butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
EpollButex* butex = pbutex ?
pbutex->load(butil::memory_order_consume) : NULL;
# endif
#elif defined(OS_MACOSX)
EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
#endif
if (butex != NULL && butex != CLOSING_GUARD) {
butex->fetch_add(1, butil::memory_order_relaxed);
butex_wake_all(butex);
}
}
}
delete [] e;
DLOG(INFO) << "EpollThread=" << _tid << "(epfd="
<< initial_epfd << ") is about to stop";
return NULL;
}
int _epfd;
bool _stop;
bthread_t _tid;
butil::Mutex _start_mutex;
};
EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM];
static inline EpollThread& get_epoll_thread(int fd) {
if (BTHREAD_EPOLL_THREAD_NUM == 1UL) {
EpollThread& et = epoll_thread[0];
et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
return et;
}
EpollThread& et = epoll_thread[butil::fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM];
et.start(BTHREAD_DEFAULT_EPOLL_SIZE);
return et;
}
//TODO(zhujiashun): change name
int stop_and_join_epoll_threads() {
// Returns -1 if any epoll thread failed to stop.
int rc = 0;
for (size_t i = 0; i < BTHREAD_EPOLL_THREAD_NUM; ++i) {
if (epoll_thread[i].stop_and_join() < 0) {
rc = -1;
}
}
return rc;
}
// For pthreads.
int pthread_fd_wait(int fd, unsigned events,
const timespec* abstime) {
return butil::pthread_fd_wait(fd, events, abstime);
}
} // namespace bthread
extern "C" {
int bthread_fd_wait(int fd, unsigned events) {
if (fd < 0) {
errno = EINVAL;
return -1;
}
bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::get_epoll_thread(fd).fd_wait(
fd, events, NULL);
}
return bthread::pthread_fd_wait(fd, events, NULL);
}
int bthread_fd_timedwait(int fd, unsigned events,
const timespec* abstime) {
if (NULL == abstime) {
return bthread_fd_wait(fd, events);
}
if (fd < 0) {
errno = EINVAL;
return -1;
}
bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::get_epoll_thread(fd).fd_wait(
fd, events, abstime);
}
return bthread::pthread_fd_wait(fd, events, abstime);
}
int bthread_connect(int sockfd, const sockaddr* serv_addr,
socklen_t addrlen) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return ::connect(sockfd, serv_addr, addrlen);
}
bool is_blocking = butil::is_blocking(sockfd);
if (is_blocking) {
butil::make_non_blocking(sockfd);
}
// Scoped non-blocking.
auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
if (is_blocking) {
butil::make_blocking(sockfd);
}
});
const int rc = ::connect(sockfd, serv_addr, addrlen);
if (rc == 0 || errno != EINPROGRESS) {
return rc;
}
#if defined(OS_LINUX)
if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) {
#endif
return -1;
}
if (butil::is_connected(sockfd) != 0) {
return -1;
}
return 0;
}
int bthread_timed_connect(int sockfd, const struct sockaddr* serv_addr,
socklen_t addrlen, const timespec* abstime) {
if (!abstime) {
return bthread_connect(sockfd, serv_addr, addrlen);
}
bool is_blocking = butil::is_blocking(sockfd);
if (is_blocking) {
butil::make_non_blocking(sockfd);
}
// Scoped non-blocking.
auto guard = butil::MakeScopeGuard([is_blocking, sockfd]() {
if (is_blocking) {
butil::make_blocking(sockfd);
}
});
const int rc = ::connect(sockfd, serv_addr, addrlen);
if (rc == 0 || errno != EINPROGRESS) {
return rc;
}
#if defined(OS_LINUX)
if (bthread_fd_timedwait(sockfd, EPOLLOUT, abstime) < 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_timedwait(sockfd, EVFILT_WRITE, abstime) < 0) {
#endif
return -1;
}
if (butil::is_connected(sockfd) != 0) {
return -1;
}
return 0;
}
// This does not wake pthreads calling bthread_fd_*wait.
int bthread_close(int fd) {
return bthread::get_epoll_thread(fd).fd_close(fd);
}
} // extern "C"