| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "butil/compat.h" |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <sys/utsname.h> // uname |
| #include <fcntl.h> |
| #include <gtest/gtest.h> |
| #include <pthread.h> |
| #include "butil/gperftools_profiler.h" |
| #include "butil/time.h" |
| #include "butil/macros.h" |
| #include "butil/fd_utility.h" |
| #include "butil/logging.h" |
| #include "bthread/task_control.h" |
| #include "bthread/task_group.h" |
| #include "bthread/interrupt_pthread.h" |
| #include "bthread/bthread.h" |
| #include "bthread/unstable.h" |
| #if defined(OS_MACOSX) |
| #include <sys/types.h> // struct kevent |
| #include <sys/event.h> // kevent(), kqueue() |
| #endif |
| |
| #ifndef NDEBUG |
| namespace bthread { |
| extern butil::atomic<int> break_nums; |
| extern TaskControl* global_task_control; |
| int stop_and_join_epoll_threads(); |
| } |
| #endif |
| |
| namespace { |
| TEST(FDTest, read_kernel_version) { |
| utsname name; |
| uname(&name); |
| std::cout << "sysname=" << name.sysname << std::endl |
| << "nodename=" << name.nodename << std::endl |
| << "release=" << name.release << std::endl |
| << "version=" << name.version << std::endl |
| << "machine=" << name.machine << std::endl; |
| } |
| |
| #define RUN_CLIENT_IN_BTHREAD 1 |
| //#define USE_BLOCKING_EPOLL 1 |
| //#define RUN_EPOLL_IN_BTHREAD 1 |
| //#define CREATE_THREAD_TO_PROCESS 1 |
| |
| volatile bool stop = false; |
| |
| struct SocketMeta { |
| int fd; |
| int epfd; |
| }; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT ClientMeta { |
| int fd; |
| size_t count; |
| size_t times; |
| }; |
| |
| struct EpollMeta { |
| int epfd; |
| }; |
| |
| const size_t NCLIENT = 30; |
| void* process_thread(void* arg) { |
| SocketMeta* m = (SocketMeta*)arg; |
| size_t count; |
| //printf("begin to process fd=%d\n", m->fd); |
| ssize_t n = read(m->fd, &count, sizeof(count)); |
| if (n != sizeof(count)) { |
| LOG(FATAL) << "Should not happen in this test"; |
| return NULL; |
| } |
| count += NCLIENT; |
| //printf("write result=%lu to fd=%d\n", count, m->fd); |
| if (write(m->fd, &count, sizeof(count)) != sizeof(count)) { |
| LOG(FATAL) << "Should not happen in this test"; |
| return NULL; |
| } |
| #ifdef CREATE_THREAD_TO_PROCESS |
| # if defined(OS_LINUX) |
| epoll_event evt = { EPOLLIN | EPOLLONESHOT, { m } }; |
| if (epoll_ctl(m->epfd, EPOLL_CTL_MOD, m->fd, &evt) < 0) { |
| epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt); |
| } |
| # elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, |
| 0, 0, m); |
| kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL); |
| # endif |
| #endif |
| return NULL; |
| } |
| |
| void* epoll_thread(void* arg) { |
| bthread_usleep(1); |
| EpollMeta* m = (EpollMeta*)arg; |
| const int epfd = m->epfd; |
| #if defined(OS_LINUX) |
| epoll_event e[32]; |
| #elif defined(OS_MACOSX) |
| struct kevent e[32]; |
| #endif |
| |
| while (!stop) { |
| |
| #if defined(OS_LINUX) |
| # ifndef USE_BLOCKING_EPOLL |
| const int n = epoll_wait(epfd, e, ARRAY_SIZE(e), 0); |
| if (stop) { |
| break; |
| } |
| if (n == 0) { |
| bthread_fd_wait(epfd, EPOLLIN); |
| continue; |
| } |
| # else |
| const int n = epoll_wait(epfd, e, ARRAY_SIZE(e), -1); |
| if (stop) { |
| break; |
| } |
| if (n == 0) { |
| continue; |
| } |
| # endif |
| #elif defined(OS_MACOSX) |
| const int n = kevent(epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); |
| if (stop) { |
| break; |
| } |
| if (n == 0) { |
| continue; |
| } |
| #endif |
| if (n < 0) { |
| if (EINTR == errno) { |
| continue; |
| } |
| #if defined(OS_LINUX) |
| PLOG(FATAL) << "Fail to epoll_wait"; |
| #elif defined(OS_MACOSX) |
| PLOG(FATAL) << "Fail to kevent"; |
| #endif |
| break; |
| } |
| |
| #ifdef CREATE_THREAD_TO_PROCESS |
| bthread_fvec vec[n]; |
| for (int i = 0; i < n; ++i) { |
| vec[i].fn = process_thread; |
| # if defined(OS_LINUX) |
| vec[i].arg = e[i].data.ptr; |
| # elif defined(OS_MACOSX) |
| vec[i].arg = e[i].udata; |
| # endif |
| } |
| bthread_t tid[n]; |
| bthread_startv(tid, vec, n, &BTHREAD_ATTR_SMALL); |
| #else |
| for (int i = 0; i < n; ++i) { |
| # if defined(OS_LINUX) |
| process_thread(e[i].data.ptr); |
| # elif defined(OS_MACOSX) |
| process_thread(e[i].udata); |
| # endif |
| } |
| #endif |
| } |
| return NULL; |
| } |
| |
| void* client_thread(void* arg) { |
| ClientMeta* m = (ClientMeta*)arg; |
| for (size_t i = 0; i < m->times; ++i) { |
| if (write(m->fd, &m->count, sizeof(m->count)) != sizeof(m->count)) { |
| LOG(FATAL) << "Should not happen in this test"; |
| return NULL; |
| } |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| ssize_t rc; |
| do { |
| # if defined(OS_LINUX) |
| const int wait_rc = bthread_fd_wait(m->fd, EPOLLIN); |
| # elif defined(OS_MACOSX) |
| const int wait_rc = bthread_fd_wait(m->fd, EVFILT_READ); |
| # endif |
| EXPECT_EQ(0, wait_rc) << berror(); |
| rc = read(m->fd, &m->count, sizeof(m->count)); |
| } while (rc < 0 && errno == EAGAIN); |
| #else |
| ssize_t rc = read(m->fd, &m->count, sizeof(m->count)); |
| #endif |
| if (rc != sizeof(m->count)) { |
| PLOG(FATAL) << "Should not happen in this test, rc=" << rc; |
| return NULL; |
| } |
| } |
| return NULL; |
| } |
| |
| inline uint32_t fmix32 ( uint32_t h ) { |
| h ^= h >> 16; |
| h *= 0x85ebca6b; |
| h ^= h >> 13; |
| h *= 0xc2b2ae35; |
| h ^= h >> 16; |
| return h; |
| } |
| |
| // Disable temporarily due to epoll's bug. The bug is fixed by |
| // a kernel patch that lots of machines currently don't have |
| TEST(FDTest, ping_pong) { |
| #ifndef NDEBUG |
| bthread::break_nums = 0; |
| #endif |
| |
| const size_t REP = 30000; |
| const size_t NEPOLL = 2; |
| |
| int epfd[NEPOLL]; |
| #ifdef RUN_EPOLL_IN_BTHREAD |
| bthread_t eth[NEPOLL]; |
| #else |
| pthread_t eth[NEPOLL]; |
| #endif |
| int fds[2 * NCLIENT]; |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| bthread_t cth[NCLIENT]; |
| #else |
| pthread_t cth[NCLIENT]; |
| #endif |
| ClientMeta* cm[NCLIENT]; |
| |
| for (size_t i = 0; i < NEPOLL; ++i) { |
| #if defined(OS_LINUX) |
| epfd[i] = epoll_create(1024); |
| #elif defined(OS_MACOSX) |
| epfd[i] = kqueue(); |
| #endif |
| ASSERT_GT(epfd[i], 0); |
| } |
| |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i)); |
| //printf("Created fd=%d,%d i=%lu\n", fds[2*i], fds[2*i+1], i); |
| SocketMeta* m = new SocketMeta; |
| m->fd = fds[i * 2]; |
| m->epfd = epfd[fmix32(i) % NEPOLL]; |
| ASSERT_EQ(0, fcntl(m->fd, F_SETFL, fcntl(m->fd, F_GETFL, 0) | O_NONBLOCK)); |
| |
| #ifdef CREATE_THREAD_TO_PROCESS |
| # if defined(OS_LINUX) |
| epoll_event evt = { EPOLLIN | EPOLLONESHOT, { m } }; |
| # elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, |
| 0, 0, m); |
| # endif |
| #else |
| # if defined(OS_LINUX) |
| epoll_event evt = { EPOLLIN, { m } }; |
| # elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, m); |
| # endif |
| #endif |
| |
| #if defined(OS_LINUX) |
| ASSERT_EQ(0, epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL)); |
| #endif |
| cm[i] = new ClientMeta; |
| cm[i]->fd = fds[i * 2 + 1]; |
| cm[i]->count = i; |
| cm[i]->times = REP; |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| butil::make_non_blocking(cm[i]->fd); |
| ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread, cm[i])); |
| #else |
| ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i])); |
| #endif |
| } |
| |
| ProfilerStart("ping_pong.prof"); |
| butil::Timer tm; |
| tm.start(); |
| |
| for (size_t i = 0; i < NEPOLL; ++i) { |
| EpollMeta *em = new EpollMeta; |
| em->epfd = epfd[i]; |
| #ifdef RUN_EPOLL_IN_BTHREAD |
| ASSERT_EQ(0, bthread_start_urgent(ð[i], epoll_thread, em, NULL); |
| #else |
| ASSERT_EQ(0, pthread_create(ð[i], NULL, epoll_thread, em)); |
| #endif |
| } |
| |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| bthread_join(cth[i], NULL); |
| #else |
| pthread_join(cth[i], NULL); |
| #endif |
| ASSERT_EQ(i + REP * NCLIENT, cm[i]->count); |
| } |
| tm.stop(); |
| ProfilerStop(); |
| LOG(INFO) << "tid=" << REP*NCLIENT*1000000L/tm.u_elapsed(); |
| stop = true; |
| for (size_t i = 0; i < NEPOLL; ++i) { |
| #if defined(OS_LINUX) |
| epoll_event evt = { EPOLLOUT, { NULL } }; |
| ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt)); |
| #elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL); |
| ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL)); |
| #endif |
| #ifdef RUN_EPOLL_IN_BTHREAD |
| bthread_join(eth[i], NULL); |
| #else |
| pthread_join(eth[i], NULL); |
| #endif |
| } |
| //bthread::stop_and_join_epoll_threads(); |
| bthread_usleep(100000); |
| |
| #ifndef NDEBUG |
| std::cout << "break_nums=" << bthread::break_nums << std::endl; |
| #endif |
| } |
| |
| TEST(FDTest, mod_closed_fd) { |
| #if defined(OS_LINUX) |
| // Conclusion: |
| // If fd is never added into epoll, MOD returns ENOENT |
| // If fd is inside epoll and valid, MOD returns 0 |
| // If fd is closed and not-reused, MOD returns EBADF |
| // If fd is closed and reused, MOD returns ENOENT again |
| |
| const int epfd = epoll_create(1024); |
| int new_fd[2]; |
| int fd[2]; |
| ASSERT_EQ(0, pipe(fd)); |
| epoll_event e = { EPOLLIN, { NULL } }; |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(ENOENT, errno); |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_ADD, fd[0], &e)); |
| // mod after add |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| // mod after mod |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(0, close(fd[0])); |
| ASSERT_EQ(0, close(fd[1])); |
| |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(EBADF, errno) << berror(); |
| |
| ASSERT_EQ(0, pipe(new_fd)); |
| ASSERT_EQ(fd[0], new_fd[0]); |
| ASSERT_EQ(fd[1], new_fd[1]); |
| |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(ENOENT, errno) << berror(); |
| |
| ASSERT_EQ(0, close(epfd)); |
| #endif |
| } |
| |
| TEST(FDTest, add_existing_fd) { |
| #if defined(OS_LINUX) |
| const int epfd = epoll_create(1024); |
| epoll_event e = { EPOLLIN, { NULL } }; |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &e)); |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &e)); |
| ASSERT_EQ(EEXIST, errno); |
| ASSERT_EQ(0, close(epfd)); |
| #endif |
| } |
| |
| void* epoll_waiter(void* arg) { |
| #if defined(OS_LINUX) |
| epoll_event e; |
| if (1 == epoll_wait((int)(intptr_t)arg, &e, 1, -1)) { |
| std::cout << e.events << std::endl; |
| } |
| #elif defined(OS_MACOSX) |
| struct kevent e; |
| if (1 == kevent((int)(intptr_t)arg, NULL, 0, &e, 1, NULL)) { |
| std::cout << e.flags << std::endl; |
| } |
| #endif |
| std::cout << pthread_self() << " quits" << std::endl; |
| return NULL; |
| } |
| |
| TEST(FDTest, interrupt_pthread) { |
| #if defined(OS_LINUX) |
| const int epfd = epoll_create(1024); |
| #elif defined(OS_MACOSX) |
| const int epfd = kqueue(); |
| #endif |
| pthread_t th, th2; |
| ASSERT_EQ(0, pthread_create(&th, NULL, epoll_waiter, (void*)(intptr_t)epfd)); |
| ASSERT_EQ(0, pthread_create(&th2, NULL, epoll_waiter, (void*)(intptr_t)epfd)); |
| bthread_usleep(100000L); |
| std::cout << "wake up " << th << std::endl; |
| bthread::interrupt_pthread(th); |
| bthread_usleep(100000L); |
| std::cout << "wake up " << th2 << std::endl; |
| bthread::interrupt_pthread(th2); |
| pthread_join(th, NULL); |
| pthread_join(th2, NULL); |
| } |
| |
| void* close_the_fd(void* arg) { |
| bthread_usleep(10000/*10ms*/); |
| EXPECT_EQ(0, bthread_close(*(int*)arg)); |
| return NULL; |
| } |
| |
| TEST(FDTest, invalid_epoll_events) { |
| errno = 0; |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_wait(-1, EPOLLIN)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(-1, bthread_fd_wait(-1, EVFILT_READ)); |
| #endif |
| ASSERT_EQ(EINVAL, errno); |
| errno = 0; |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(-1, EPOLLIN, NULL)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(-1, EVFILT_READ, NULL)); |
| #endif |
| ASSERT_EQ(EINVAL, errno); |
| |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_wait(fds[0], EPOLLET)); |
| ASSERT_EQ(EINVAL, errno); |
| #endif |
| bthread_t th; |
| ASSERT_EQ(0, bthread_start_urgent(&th, NULL, close_the_fd, &fds[1])); |
| butil::Timer tm; |
| tm.start(); |
| #if defined(OS_LINUX) |
| ASSERT_EQ(0, bthread_fd_wait(fds[0], EPOLLIN | EPOLLET)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(0, bthread_fd_wait(fds[0], EVFILT_READ)); |
| #endif |
| tm.stop(); |
| ASSERT_LT(tm.m_elapsed(), 20); |
| ASSERT_EQ(0, bthread_join(th, NULL)); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| } |
| |
| void* wait_for_the_fd(void* arg) { |
| timespec ts = butil::milliseconds_from_now(50); |
| #if defined(OS_LINUX) |
| bthread_fd_timedwait(*(int*)arg, EPOLLIN, &ts); |
| #elif defined(OS_MACOSX) |
| bthread_fd_timedwait(*(int*)arg, EVFILT_READ, &ts); |
| #endif |
| return NULL; |
| } |
| |
| TEST(FDTest, timeout) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| pthread_t th; |
| ASSERT_EQ(0, pthread_create(&th, NULL, wait_for_the_fd, &fds[0])); |
| bthread_t bth; |
| ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, wait_for_the_fd, &fds[0])); |
| butil::Timer tm; |
| tm.start(); |
| ASSERT_EQ(0, pthread_join(th, NULL)); |
| ASSERT_EQ(0, bthread_join(bth, NULL)); |
| tm.stop(); |
| ASSERT_LT(tm.m_elapsed(), 80); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| } |
| |
| TEST(FDTest, close_should_wakeup_waiter) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| bthread_t bth; |
| ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, wait_for_the_fd, &fds[0])); |
| butil::Timer tm; |
| tm.start(); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| ASSERT_EQ(0, bthread_join(bth, NULL)); |
| tm.stop(); |
| ASSERT_LT(tm.m_elapsed(), 5); |
| |
| // Launch again, should quit soon due to EBADF |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(fds[0], EPOLLIN, NULL)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(fds[0], EVFILT_READ, NULL)); |
| #endif |
| ASSERT_EQ(EBADF, errno); |
| |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| } |
| |
| TEST(FDTest, close_definitely_invalid) { |
| int ec = 0; |
| ASSERT_EQ(-1, close(-1)); |
| ec = errno; |
| ASSERT_EQ(-1, bthread_close(-1)); |
| ASSERT_EQ(ec, errno); |
| } |
| |
| TEST(FDTest, bthread_close_fd_which_did_not_call_bthread_functions) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| } |
| |
| TEST(FDTest, double_close) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| ASSERT_EQ(0, close(fds[0])); |
| int ec = 0; |
| ASSERT_EQ(-1, close(fds[0])); |
| ec = errno; |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| ASSERT_EQ(-1, bthread_close(fds[1])); |
| ASSERT_EQ(ec, errno); |
| } |
| } // namespace |