| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "butil/compat.h" |
| #include "butil/compiler_specific.h" |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <sys/utsname.h> // uname |
| #include <fcntl.h> |
| #include <gtest/gtest.h> |
| #include <pthread.h> |
| #include "gperftools_helper.h" |
| #include "butil/time.h" |
| #include "butil/macros.h" |
| #include "butil/fd_utility.h" |
| #include <butil/endpoint.h> |
| #include <butil/fd_guard.h> |
| #include "butil/logging.h" |
| #include "bthread/task_control.h" |
| #include "bthread/task_group.h" |
| #include "bthread/interrupt_pthread.h" |
| #include "bthread/bthread.h" |
| #include "bthread/unstable.h" |
| #include <netinet/tcp.h> |
| #if defined(OS_MACOSX) |
| #include <sys/types.h> // struct kevent |
| #include <sys/event.h> // kevent(), kqueue() |
| #include <netinet/tcp_fsm.h> |
| #endif |
| |
| #ifndef NDEBUG |
| namespace bthread { |
| extern butil::atomic<int> break_nums; |
| extern TaskControl* global_task_control; |
| int stop_and_join_epoll_threads(); |
| } |
| #endif |
| |
| namespace { |
| TEST(FDTest, read_kernel_version) { |
| utsname name; |
| uname(&name); |
| std::cout << "sysname=" << name.sysname << std::endl |
| << "nodename=" << name.nodename << std::endl |
| << "release=" << name.release << std::endl |
| << "version=" << name.version << std::endl |
| << "machine=" << name.machine << std::endl; |
| } |
| |
| #define RUN_CLIENT_IN_BTHREAD 1 |
| //#define USE_BLOCKING_EPOLL 1 |
| //#define RUN_EPOLL_IN_BTHREAD 1 |
| //#define CREATE_THREAD_TO_PROCESS 1 |
| |
| volatile bool stop = false; |
| |
| struct SocketMeta { |
| int fd; |
| int epfd; |
| }; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT ClientMeta { |
| int fd; |
| size_t count; |
| size_t times; |
| }; |
| |
| struct EpollMeta { |
| int epfd; |
| }; |
| |
| const size_t NCLIENT = 30; |
| void* process_thread(void* arg) { |
| SocketMeta* m = (SocketMeta*)arg; |
| size_t count; |
| //printf("begin to process fd=%d\n", m->fd); |
| ssize_t n = read(m->fd, &count, sizeof(count)); |
| if (n != sizeof(count)) { |
| LOG(FATAL) << "Should not happen in this test"; |
| return NULL; |
| } |
| count += NCLIENT; |
| //printf("write result=%lu to fd=%d\n", count, m->fd); |
| if (write(m->fd, &count, sizeof(count)) != sizeof(count)) { |
| LOG(FATAL) << "Should not happen in this test"; |
| return NULL; |
| } |
| #ifdef CREATE_THREAD_TO_PROCESS |
| # if defined(OS_LINUX) |
| epoll_event evt = { EPOLLIN | EPOLLONESHOT, { m } }; |
| if (epoll_ctl(m->epfd, EPOLL_CTL_MOD, m->fd, &evt) < 0) { |
| epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt); |
| } |
| # elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, |
| 0, 0, m); |
| kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL); |
| # endif |
| #endif |
| return NULL; |
| } |
| |
| void* epoll_thread(void* arg) { |
| bthread_usleep(1); |
| EpollMeta* m = (EpollMeta*)arg; |
| const int epfd = m->epfd; |
| #if defined(OS_LINUX) |
| epoll_event e[32]; |
| #elif defined(OS_MACOSX) |
| struct kevent e[32]; |
| #endif |
| |
| while (!stop) { |
| |
| #if defined(OS_LINUX) |
| # ifndef USE_BLOCKING_EPOLL |
| const int n = epoll_wait(epfd, e, ARRAY_SIZE(e), 0); |
| if (stop) { |
| break; |
| } |
| if (n == 0) { |
| bthread_fd_wait(epfd, EPOLLIN); |
| continue; |
| } |
| # else |
| const int n = epoll_wait(epfd, e, ARRAY_SIZE(e), -1); |
| if (stop) { |
| break; |
| } |
| if (n == 0) { |
| continue; |
| } |
| # endif |
| #elif defined(OS_MACOSX) |
| const int n = kevent(epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); |
| if (stop) { |
| break; |
| } |
| if (n == 0) { |
| continue; |
| } |
| #endif |
| if (n < 0) { |
| if (EINTR == errno) { |
| continue; |
| } |
| #if defined(OS_LINUX) |
| PLOG(FATAL) << "Fail to epoll_wait"; |
| #elif defined(OS_MACOSX) |
| PLOG(FATAL) << "Fail to kevent"; |
| #endif |
| break; |
| } |
| |
| #ifdef CREATE_THREAD_TO_PROCESS |
| bthread_fvec vec[n]; |
| for (int i = 0; i < n; ++i) { |
| vec[i].fn = process_thread; |
| # if defined(OS_LINUX) |
| vec[i].arg = e[i].data.ptr; |
| # elif defined(OS_MACOSX) |
| vec[i].arg = e[i].udata; |
| # endif |
| } |
| bthread_t tid[n]; |
| bthread_startv(tid, vec, n, &BTHREAD_ATTR_SMALL); |
| #else |
| for (int i = 0; i < n; ++i) { |
| # if defined(OS_LINUX) |
| process_thread(e[i].data.ptr); |
| # elif defined(OS_MACOSX) |
| process_thread(e[i].udata); |
| # endif |
| } |
| #endif |
| } |
| return NULL; |
| } |
| |
| void* client_thread(void* arg) { |
| ClientMeta* m = (ClientMeta*)arg; |
| for (size_t i = 0; i < m->times; ++i) { |
| if (write(m->fd, &m->count, sizeof(m->count)) != sizeof(m->count)) { |
| LOG(FATAL) << "Should not happen in this test"; |
| return NULL; |
| } |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| ssize_t rc; |
| do { |
| # if defined(OS_LINUX) |
| const int wait_rc = bthread_fd_wait(m->fd, EPOLLIN); |
| # elif defined(OS_MACOSX) |
| const int wait_rc = bthread_fd_wait(m->fd, EVFILT_READ); |
| # endif |
| EXPECT_EQ(0, wait_rc) << berror(); |
| rc = read(m->fd, &m->count, sizeof(m->count)); |
| } while (rc < 0 && errno == EAGAIN); |
| #else |
| ssize_t rc = read(m->fd, &m->count, sizeof(m->count)); |
| #endif |
| if (rc != sizeof(m->count)) { |
| PLOG(FATAL) << "Should not happen in this test, rc=" << rc; |
| return NULL; |
| } |
| } |
| return NULL; |
| } |
| |
| inline uint32_t fmix32 ( uint32_t h ) { |
| h ^= h >> 16; |
| h *= 0x85ebca6b; |
| h ^= h >> 13; |
| h *= 0xc2b2ae35; |
| h ^= h >> 16; |
| return h; |
| } |
| |
| // Disable temporarily due to epoll's bug. The bug is fixed by |
| // a kernel patch that lots of machines currently don't have |
| TEST(FDTest, ping_pong) { |
| #ifndef NDEBUG |
| bthread::break_nums = 0; |
| #endif |
| |
| const size_t REP = 30000; |
| const size_t NEPOLL = 2; |
| |
| int epfd[NEPOLL]; |
| #ifdef RUN_EPOLL_IN_BTHREAD |
| bthread_t eth[NEPOLL]; |
| #else |
| pthread_t eth[NEPOLL]; |
| #endif |
| int fds[2 * NCLIENT]; |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| bthread_t cth[NCLIENT]; |
| #else |
| pthread_t cth[NCLIENT]; |
| #endif |
| ClientMeta* cm[NCLIENT]; |
| |
| for (size_t i = 0; i < NEPOLL; ++i) { |
| #if defined(OS_LINUX) |
| epfd[i] = epoll_create(1024); |
| #elif defined(OS_MACOSX) |
| epfd[i] = kqueue(); |
| #endif |
| ASSERT_GT(epfd[i], 0); |
| } |
| |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i)); |
| //printf("Created fd=%d,%d i=%lu\n", fds[2*i], fds[2*i+1], i); |
| SocketMeta* m = new SocketMeta; |
| m->fd = fds[i * 2]; |
| m->epfd = epfd[fmix32(i) % NEPOLL]; |
| ASSERT_EQ(0, fcntl(m->fd, F_SETFL, fcntl(m->fd, F_GETFL, 0) | O_NONBLOCK)); |
| |
| #ifdef CREATE_THREAD_TO_PROCESS |
| # if defined(OS_LINUX) |
| epoll_event evt = { EPOLLIN | EPOLLONESHOT, { m } }; |
| # elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_ONESHOT, |
| 0, 0, m); |
| # endif |
| #else |
| # if defined(OS_LINUX) |
| epoll_event evt = { EPOLLIN, { m } }; |
| # elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, m->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, m); |
| # endif |
| #endif |
| |
| #if defined(OS_LINUX) |
| ASSERT_EQ(0, epoll_ctl(m->epfd, EPOLL_CTL_ADD, m->fd, &evt)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL)); |
| #endif |
| cm[i] = new ClientMeta; |
| cm[i]->fd = fds[i * 2 + 1]; |
| cm[i]->count = i; |
| cm[i]->times = REP; |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| butil::make_non_blocking(cm[i]->fd); |
| ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread, cm[i])); |
| #else |
| ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i])); |
| #endif |
| } |
| |
| ProfilerStart("ping_pong.prof"); |
| butil::Timer tm; |
| tm.start(); |
| |
| for (size_t i = 0; i < NEPOLL; ++i) { |
| EpollMeta *em = new EpollMeta; |
| em->epfd = epfd[i]; |
| #ifdef RUN_EPOLL_IN_BTHREAD |
| ASSERT_EQ(0, bthread_start_urgent(ð[i], epoll_thread, em, NULL); |
| #else |
| ASSERT_EQ(0, pthread_create(ð[i], NULL, epoll_thread, em)); |
| #endif |
| } |
| |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| #ifdef RUN_CLIENT_IN_BTHREAD |
| bthread_join(cth[i], NULL); |
| #else |
| pthread_join(cth[i], NULL); |
| #endif |
| ASSERT_EQ(i + REP * NCLIENT, cm[i]->count); |
| } |
| tm.stop(); |
| ProfilerStop(); |
| LOG(INFO) << "tid=" << REP*NCLIENT * 1000000L / tm.u_elapsed(); |
| stop = true; |
| for (size_t i = 0; i < NEPOLL; ++i) { |
| #if defined(OS_LINUX) |
| epoll_event evt = { EPOLLOUT, { NULL } }; |
| ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt)); |
| #elif defined(OS_MACOSX) |
| struct kevent kqueue_event; |
| EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL); |
| ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL)); |
| #endif |
| #ifdef RUN_EPOLL_IN_BTHREAD |
| bthread_join(eth[i], NULL); |
| #else |
| pthread_join(eth[i], NULL); |
| #endif |
| } |
| //bthread::stop_and_join_epoll_threads(); |
| bthread_usleep(100000); |
| |
| #ifndef NDEBUG |
| std::cout << "break_nums=" << bthread::break_nums << std::endl; |
| #endif |
| } |
| |
| TEST(FDTest, mod_closed_fd) { |
| #if defined(OS_LINUX) |
| // Conclusion: |
| // If fd is never added into epoll, MOD returns ENOENT |
| // If fd is inside epoll and valid, MOD returns 0 |
| // If fd is closed and not-reused, MOD returns EBADF |
| // If fd is closed and reused, MOD returns ENOENT again |
| |
| const int epfd = epoll_create(1024); |
| int new_fd[2]; |
| int fd[2]; |
| ASSERT_EQ(0, pipe(fd)); |
| epoll_event e = { EPOLLIN, { NULL } }; |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(ENOENT, errno); |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_ADD, fd[0], &e)); |
| // mod after add |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| // mod after mod |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(0, close(fd[0])); |
| ASSERT_EQ(0, close(fd[1])); |
| |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(EBADF, errno) << berror(); |
| |
| ASSERT_EQ(0, pipe(new_fd)); |
| ASSERT_EQ(fd[0], new_fd[0]); |
| ASSERT_EQ(fd[1], new_fd[1]); |
| |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_MOD, fd[0], &e)); |
| ASSERT_EQ(ENOENT, errno) << berror(); |
| |
| ASSERT_EQ(0, close(epfd)); |
| #endif |
| } |
| |
| TEST(FDTest, add_existing_fd) { |
| #if defined(OS_LINUX) |
| const int epfd = epoll_create(1024); |
| epoll_event e = { EPOLLIN, { NULL } }; |
| ASSERT_EQ(0, epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &e)); |
| errno = 0; |
| ASSERT_EQ(-1, epoll_ctl(epfd, EPOLL_CTL_ADD, 0, &e)); |
| ASSERT_EQ(EEXIST, errno); |
| ASSERT_EQ(0, close(epfd)); |
| #endif |
| } |
| |
| void* epoll_waiter(void* arg) { |
| #if defined(OS_LINUX) |
| epoll_event e; |
| if (1 == epoll_wait((int)(intptr_t)arg, &e, 1, -1)) { |
| std::cout << e.events << std::endl; |
| } |
| #elif defined(OS_MACOSX) |
| struct kevent e; |
| if (1 == kevent((int)(intptr_t)arg, NULL, 0, &e, 1, NULL)) { |
| std::cout << e.flags << std::endl; |
| } |
| #endif |
| std::cout << pthread_self() << " quits" << std::endl; |
| return NULL; |
| } |
| |
| TEST(FDTest, interrupt_pthread) { |
| #if defined(OS_LINUX) |
| const int epfd = epoll_create(1024); |
| #elif defined(OS_MACOSX) |
| const int epfd = kqueue(); |
| #endif |
| pthread_t th, th2; |
| ASSERT_EQ(0, pthread_create(&th, NULL, epoll_waiter, (void*)(intptr_t)epfd)); |
| ASSERT_EQ(0, pthread_create(&th2, NULL, epoll_waiter, (void*)(intptr_t)epfd)); |
| bthread_usleep(100000L); |
| std::cout << "wake up " << th << std::endl; |
| bthread::interrupt_pthread(th); |
| bthread_usleep(100000L); |
| std::cout << "wake up " << th2 << std::endl; |
| bthread::interrupt_pthread(th2); |
| pthread_join(th, NULL); |
| pthread_join(th2, NULL); |
| } |
| |
| void* close_the_fd(void* arg) { |
| bthread_usleep(10000/*10ms*/); |
| EXPECT_EQ(0, bthread_close(*(int*)arg)); |
| return NULL; |
| } |
| |
| TEST(FDTest, invalid_epoll_events) { |
| errno = 0; |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_wait(-1, EPOLLIN)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(-1, bthread_fd_wait(-1, EVFILT_READ)); |
| #endif |
| ASSERT_EQ(EINVAL, errno); |
| errno = 0; |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(-1, EPOLLIN, NULL)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(-1, EVFILT_READ, NULL)); |
| #endif |
| ASSERT_EQ(EINVAL, errno); |
| |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_wait(fds[0], EPOLLET)); |
| ASSERT_EQ(EINVAL, errno); |
| #endif |
| bthread_t th; |
| ASSERT_EQ(0, bthread_start_urgent(&th, NULL, close_the_fd, &fds[1])); |
| butil::Timer tm; |
| tm.start(); |
| #if defined(OS_LINUX) |
| ASSERT_EQ(0, bthread_fd_wait(fds[0], EPOLLIN | EPOLLET)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(0, bthread_fd_wait(fds[0], EVFILT_READ)); |
| #endif |
| tm.stop(); |
| ASSERT_LT(tm.m_elapsed(), 20); |
| ASSERT_EQ(0, bthread_join(th, NULL)); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| } |
| |
| void* wait_for_the_fd(void* arg) { |
| timespec ts = butil::milliseconds_from_now(50); |
| #if defined(OS_LINUX) |
| bthread_fd_timedwait(*(int*)arg, EPOLLIN, &ts); |
| #elif defined(OS_MACOSX) |
| bthread_fd_timedwait(*(int*)arg, EVFILT_READ, &ts); |
| #endif |
| return NULL; |
| } |
| |
| TEST(FDTest, timeout) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| pthread_t th; |
| ASSERT_EQ(0, pthread_create(&th, NULL, wait_for_the_fd, &fds[0])); |
| bthread_t bth; |
| ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, wait_for_the_fd, &fds[0])); |
| butil::Timer tm; |
| tm.start(); |
| ASSERT_EQ(0, pthread_join(th, NULL)); |
| ASSERT_EQ(0, bthread_join(bth, NULL)); |
| tm.stop(); |
| ASSERT_LT(tm.m_elapsed(), 80); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| } |
| |
| TEST(FDTest, close_should_wakeup_waiter) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| bthread_t bth; |
| ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, wait_for_the_fd, &fds[0])); |
| butil::Timer tm; |
| tm.start(); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| ASSERT_EQ(0, bthread_join(bth, NULL)); |
| tm.stop(); |
| ASSERT_LT(tm.m_elapsed(), 5); |
| |
| // Launch again, should quit soon due to EBADF |
| #if defined(OS_LINUX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(fds[0], EPOLLIN, NULL)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(-1, bthread_fd_timedwait(fds[0], EVFILT_READ, NULL)); |
| #endif |
| ASSERT_EQ(EBADF, errno); |
| |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| } |
| |
| TEST(FDTest, close_definitely_invalid) { |
| int ec = 0; |
| ASSERT_EQ(-1, close(-1)); |
| ec = errno; |
| ASSERT_EQ(-1, bthread_close(-1)); |
| ASSERT_EQ(ec, errno); |
| } |
| |
| TEST(FDTest, bthread_close_fd_which_did_not_call_bthread_functions) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| ASSERT_EQ(0, bthread_close(fds[0])); |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| } |
| |
| TEST(FDTest, double_close) { |
| int fds[2]; |
| ASSERT_EQ(0, pipe(fds)); |
| ASSERT_EQ(0, close(fds[0])); |
| int ec = 0; |
| ASSERT_EQ(-1, close(fds[0])); |
| ec = errno; |
| ASSERT_EQ(0, bthread_close(fds[1])); |
| ASSERT_EQ(-1, bthread_close(fds[1])); |
| ASSERT_EQ(ec, errno); |
| } |
| |
| const char* g_hostname1 = "github.com"; |
| const char* g_hostname2 = "baidu.com"; |
| TEST(FDTest, bthread_connect) { |
| butil::EndPoint ep1; |
| butil::EndPoint ep2; |
| ASSERT_EQ(0, butil::hostname2endpoint(g_hostname1, 80, &ep1)); |
| ASSERT_EQ(0, butil::hostname2endpoint(g_hostname2, 80, &ep2)); |
| |
| { |
| struct sockaddr_storage serv_addr{}; |
| socklen_t serv_addr_size = 0; |
| ASSERT_EQ(0, endpoint2sockaddr(ep1, &serv_addr, &serv_addr_size)); |
| butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); |
| ASSERT_LE(0, sockfd); |
| bool is_blocking = butil::is_blocking(sockfd); |
| ASSERT_LE(0, sockfd); |
| ASSERT_EQ(0, bthread_connect(sockfd, (struct sockaddr*) &serv_addr, serv_addr_size)); |
| ASSERT_EQ(is_blocking, butil::is_blocking(sockfd)); |
| |
| } |
| |
| { |
| struct sockaddr_storage serv_addr{}; |
| socklen_t serv_addr_size = 0; |
| ASSERT_EQ(0, endpoint2sockaddr(ep2, &serv_addr, &serv_addr_size)); |
| butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); |
| ASSERT_LE(0, sockfd); |
| bool is_blocking = butil::is_blocking(sockfd); |
| // In most cases, 1 millisecond will result in a connection timeout. |
| timespec abstime = butil::milliseconds_from_now(1); |
| const int rc = bthread_timed_connect( |
| sockfd, (struct sockaddr*) &serv_addr, |
| serv_addr_size, &abstime); |
| ASSERT_EQ(-1, rc); |
| ASSERT_EQ(ETIMEDOUT, errno); |
| ASSERT_EQ(is_blocking, butil::is_blocking(sockfd)); |
| } |
| } |
| |
| void TestConnectInterruptImpl(bool timed) { |
| butil::EndPoint ep; |
| ASSERT_EQ(0, butil::hostname2endpoint(g_hostname1, 80, &ep)); |
| struct sockaddr_storage serv_addr{}; |
| socklen_t serv_addr_size = 0; |
| ASSERT_EQ(0, endpoint2sockaddr(ep, &serv_addr, &serv_addr_size)); |
| butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0)); |
| ASSERT_GE(sockfd, 0); |
| |
| int rc; |
| if (timed) { |
| int64_t start_ms = butil::cpuwide_time_ms(); |
| butil::tcp_connect(ep, NULL); |
| int64_t connect_ms = butil::cpuwide_time_ms() - start_ms; |
| LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms"; |
| |
| timespec abstime = butil::milliseconds_from_now(connect_ms * 10); |
| rc = bthread_timed_connect( |
| sockfd, (struct sockaddr*) &serv_addr, |
| serv_addr_size, &abstime); |
| } else { |
| rc = bthread_timed_connect( |
| sockfd, (struct sockaddr*) &serv_addr, |
| serv_addr_size, NULL); |
| } |
| ASSERT_EQ(0, rc) << "errno=" << errno; |
| ASSERT_EQ(0, butil::is_connected(sockfd)); |
| |
| } |
| |
| void* ConnectThread(void* arg) { |
| bool timed = *(bool*)arg; |
| TestConnectInterruptImpl(timed); |
| return NULL; |
| } |
| |
| void TestConnectInterrupt(bool timed) { |
| bthread_t tid; |
| ASSERT_EQ(0, bthread_start_background(&tid, NULL, ConnectThread, &timed)); |
| ASSERT_EQ(0, bthread_stop(tid)); |
| ASSERT_EQ(0, bthread_join(tid, NULL)); |
| } |
| |
| TEST(FDTest, interrupt) { |
| TestConnectInterrupt(false); |
| TestConnectInterrupt(true); |
| } |
| |
| } // namespace |