| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // brpc - A framework to host and access services throughout Baidu. |
| |
| // Date: Sun Jul 13 15:04:18 CST 2014 |
| |
| #include <pthread.h> |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <gtest/gtest.h> |
| #include "butil/gperftools_profiler.h" |
| #include "butil/time.h" |
| #include "butil/macros.h" |
| #include "butil/fd_utility.h" |
| #include "brpc/event_dispatcher.h" |
| #include "brpc/details/has_epollrdhup.h" |
| |
| class EventDispatcherTest : public ::testing::Test{ |
| protected: |
| EventDispatcherTest(){ |
| }; |
| virtual ~EventDispatcherTest(){}; |
| virtual void SetUp() { |
| }; |
| virtual void TearDown() { |
| }; |
| }; |
| |
| TEST_F(EventDispatcherTest, has_epollrdhup) { |
| LOG(INFO) << brpc::has_epollrdhup; |
| } |
| |
| TEST_F(EventDispatcherTest, versioned_ref) { |
| butil::atomic<uint64_t> versioned_ref(2); |
| versioned_ref.fetch_add(brpc::MakeVRef(0, -1), |
| butil::memory_order_release); |
| ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref); |
| } |
| |
| std::vector<int> err_fd; |
| pthread_mutex_t err_fd_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| std::vector<int> rel_fd; |
| pthread_mutex_t rel_fd_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| volatile bool client_stop = false; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT ClientMeta { |
| int fd; |
| size_t times; |
| size_t bytes; |
| }; |
| |
| struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public brpc::SocketUser { |
| char* buf; |
| size_t buf_cap; |
| size_t bytes; |
| size_t times; |
| |
| SocketExtra() { |
| buf_cap = 32768; |
| buf = (char*)malloc(buf_cap); |
| bytes = 0; |
| times = 0; |
| } |
| |
| virtual void BeforeRecycle(brpc::Socket* m) { |
| pthread_mutex_lock(&rel_fd_mutex); |
| rel_fd.push_back(m->fd()); |
| pthread_mutex_unlock(&rel_fd_mutex); |
| delete this; |
| } |
| |
| static int OnEdgeTriggeredEventOnce(brpc::Socket* m) { |
| SocketExtra* e = static_cast<SocketExtra*>(m->user()); |
| // Read all data. |
| do { |
| ssize_t n = read(m->fd(), e->buf, e->buf_cap); |
| if (n == 0 |
| #ifdef BRPC_SOCKET_HAS_EOF |
| || m->_eof |
| #endif |
| ) { |
| pthread_mutex_lock(&err_fd_mutex); |
| err_fd.push_back(m->fd()); |
| pthread_mutex_unlock(&err_fd_mutex); |
| LOG(WARNING) << "Another end closed fd=" << m->fd(); |
| return -1; |
| } else if (n > 0) { |
| e->bytes += n; |
| ++e->times; |
| #ifdef BRPC_SOCKET_HAS_EOF |
| if ((size_t)n < e->buf_cap && brpc::has_epollrdhup) { |
| break; |
| } |
| #endif |
| } else { |
| if (errno == EAGAIN) { |
| break; |
| } else if (errno == EINTR) { |
| continue; |
| } else { |
| PLOG(WARNING) << "Fail to read fd=" << m->fd(); |
| return -1; |
| } |
| } |
| } while (1); |
| return 0; |
| } |
| |
| static void OnEdgeTriggeredEvents(brpc::Socket* m) { |
| int progress = brpc::Socket::PROGRESS_INIT; |
| do { |
| if (OnEdgeTriggeredEventOnce(m) != 0) { |
| m->SetFailed(); |
| return; |
| } |
| } while (m->MoreReadEvents(&progress)); |
| } |
| }; |
| |
| void* client_thread(void* arg) { |
| ClientMeta* m = (ClientMeta*)arg; |
| size_t offset = 0; |
| m->times = 0; |
| m->bytes = 0; |
| const size_t buf_cap = 32768; |
| char* buf = (char*)malloc(buf_cap); |
| for (size_t i = 0; i < buf_cap/8; ++i) { |
| ((uint64_t*)buf)[i] = i; |
| } |
| while (!client_stop) { |
| ssize_t n; |
| if (offset == 0) { |
| n = write(m->fd, buf, buf_cap); |
| } else { |
| iovec v[2]; |
| v[0].iov_base = buf + offset; |
| v[0].iov_len = buf_cap - offset; |
| v[1].iov_base = buf; |
| v[1].iov_len = offset; |
| n = writev(m->fd, v, 2); |
| } |
| if (n < 0) { |
| if (errno != EINTR) { |
| PLOG(WARNING) << "Fail to write fd=" << m->fd; |
| break; |
| } |
| } else { |
| ++m->times; |
| m->bytes += n; |
| offset += n; |
| if (offset >= buf_cap) { |
| offset -= buf_cap; |
| } |
| } |
| } |
| EXPECT_EQ(0, close(m->fd)); |
| return NULL; |
| } |
| |
| inline uint32_t fmix32 ( uint32_t h ) { |
| h ^= h >> 16; |
| h *= 0x85ebca6b; |
| h ^= h >> 13; |
| h *= 0xc2b2ae35; |
| h ^= h >> 16; |
| return h; |
| } |
| |
| TEST_F(EventDispatcherTest, dispatch_tasks) { |
| #ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM |
| const butil::ResourcePoolInfo old_info = |
| butil::describe_resources<brpc::Socket>(); |
| #endif |
| |
| client_stop = false; |
| |
| const size_t NCLIENT = 16; |
| |
| int fds[2 * NCLIENT]; |
| pthread_t cth[NCLIENT]; |
| ClientMeta* cm[NCLIENT]; |
| SocketExtra* sm[NCLIENT]; |
| |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i)); |
| sm[i] = new SocketExtra; |
| |
| const int fd = fds[i * 2]; |
| butil::make_non_blocking(fd); |
| brpc::SocketId socket_id; |
| brpc::SocketOptions options; |
| options.fd = fd; |
| options.user = sm[i]; |
| options.on_edge_triggered_events = SocketExtra::OnEdgeTriggeredEvents; |
| |
| ASSERT_EQ(0, brpc::Socket::Create(options, &socket_id)); |
| cm[i] = new ClientMeta; |
| cm[i]->fd = fds[i * 2 + 1]; |
| cm[i]->times = 0; |
| cm[i]->bytes = 0; |
| ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i])); |
| } |
| |
| LOG(INFO) << "Begin to profile... (5 seconds)"; |
| ProfilerStart("event_dispatcher.prof"); |
| butil::Timer tm; |
| tm.start(); |
| |
| sleep(5); |
| |
| tm.stop(); |
| ProfilerStop(); |
| LOG(INFO) << "End profiling"; |
| |
| size_t client_bytes = 0; |
| size_t server_bytes = 0; |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| client_bytes += cm[i]->bytes; |
| server_bytes += sm[i]->bytes; |
| } |
| LOG(INFO) << "client_tp=" << client_bytes / (double)tm.u_elapsed() |
| << "MB/s server_tp=" << server_bytes / (double)tm.u_elapsed() |
| << "MB/s"; |
| |
| client_stop = true; |
| for (size_t i = 0; i < NCLIENT; ++i) { |
| pthread_join(cth[i], NULL); |
| } |
| sleep(1); |
| |
| std::vector<int> copy1, copy2; |
| pthread_mutex_lock(&err_fd_mutex); |
| copy1.swap(err_fd); |
| pthread_mutex_unlock(&err_fd_mutex); |
| pthread_mutex_lock(&rel_fd_mutex); |
| copy2.swap(rel_fd); |
| pthread_mutex_unlock(&rel_fd_mutex); |
| |
| std::sort(copy1.begin(), copy1.end()); |
| std::sort(copy2.begin(), copy2.end()); |
| ASSERT_EQ(copy1.size(), copy2.size()); |
| for (size_t i = 0; i < copy1.size(); ++i) { |
| ASSERT_EQ(copy1[i], copy2[i]) << i; |
| } |
| ASSERT_EQ(NCLIENT, copy1.size()); |
| const butil::ResourcePoolInfo info |
| = butil::describe_resources<brpc::Socket>(); |
| LOG(INFO) << info; |
| #ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM |
| ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num); |
| #endif |
| } |