blob: 5c0aa064d8aa3b36ae9d52a2b69fc8679da051d6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// brpc - A framework to host and access services throughout Baidu.
// Date: Sun Jul 13 15:04:18 CST 2014
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <gtest/gtest.h>
#include "gperftools_helper.h"
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/fd_utility.h"
#include "butil/memory/scope_guard.h"
#include "bthread/bthread.h"
#include "brpc/event_dispatcher.h"
#include "brpc/socket.h"
#include "brpc/details/has_epollrdhup.h"
#include "brpc/versioned_ref_with_id.h"
class EventDispatcherTest : public ::testing::Test{
protected:
EventDispatcherTest() = default;
~EventDispatcherTest() override = default;
void SetUp() override {}
void TearDown() override {}
};
TEST_F(EventDispatcherTest, has_epollrdhup) {
LOG(INFO) << brpc::has_epollrdhup;
}
TEST_F(EventDispatcherTest, versioned_ref) {
butil::atomic<uint64_t> versioned_ref(2);
versioned_ref.fetch_add(brpc::MakeVRef(0, -1),
butil::memory_order_release);
ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref);
}
struct UserData;
UserData* g_user_data = NULL;
struct UserData : public brpc::VersionedRefWithId<UserData> {
explicit UserData(Forbidden f)
: brpc::VersionedRefWithId<UserData>(f)
, count(0)
, _additional_ref_released(false) {}
int OnCreated() {
count.store(1, butil::memory_order_relaxed);
_additional_ref_released = false;
g_user_data = this;
return 0;
}
void BeforeRecycled() {
count.store(0, butil::memory_order_relaxed);
g_user_data = NULL;
}
void BeforeAdditionalRefReleased() {
_additional_ref_released = true;
}
void OnFailed() {
count.fetch_sub(1, butil::memory_order_relaxed);
}
void AfterRevived() {
count.fetch_add(1, butil::memory_order_relaxed);
_additional_ref_released = false;
}
butil::atomic<int> count;
bool _additional_ref_released;
};
// Unique identifier of a UserData.
// Users shall store UserDataId instead of UserData and call UserData::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed UserData will not be recycled.
typedef brpc::VRefId UserDataId;
const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID;
typedef brpc::VersionedRefWithIdUniquePtr<UserData> UserDataUniquePtr;
volatile bool vref_thread_stop = false;
butil::atomic<int> g_count(1);
void TestVRef(UserDataId id) {
UserDataUniquePtr ptr;
ASSERT_EQ(0, UserData::Address(id, &ptr));
ptr->count.fetch_add(1, butil::memory_order_relaxed);
g_count.fetch_add(1, butil::memory_order_relaxed);
}
void* VRefThread(void* arg) {
auto id = (UserDataId)arg;
while (!vref_thread_stop) {
TestVRef(id);
}
return NULL;
}
TEST_F(EventDispatcherTest, versioned_ref_with_id) {
UserDataId id = INVALID_EVENT_DATA_ID;
ASSERT_EQ(0, UserData::Create(&id));
ASSERT_NE(INVALID_EVENT_DATA_ID, id);
UserDataUniquePtr ptr;
ASSERT_EQ(0, UserData::Address(id, &ptr));
ASSERT_EQ(2, ptr->nref());
ASSERT_FALSE(ptr->Failed());
ASSERT_EQ(1, ptr->count);
ASSERT_EQ(g_user_data, ptr.get());
{
UserDataUniquePtr temp_ptr;
ASSERT_EQ(0, UserData::Address(id, &temp_ptr));
ASSERT_EQ(ptr, temp_ptr);
ASSERT_EQ(3, ptr->nref());
}
const size_t thread_num = 8;
pthread_t tid[thread_num];
for (auto& i : tid) {
ASSERT_EQ(0, pthread_create(&i, NULL, VRefThread, (void*)id));
}
sleep(2);
vref_thread_stop = true;
for (const auto i : tid) {
pthread_join(i, NULL);
}
ASSERT_EQ(2, ptr->nref());
ASSERT_EQ(g_count, ptr->count);
ASSERT_EQ(0, ptr->SetFailed());
ASSERT_TRUE(ptr->Failed());
ASSERT_EQ(g_count - 1, ptr->count);
// Additional reference has been released.
ASSERT_TRUE(ptr->_additional_ref_released);
ASSERT_EQ(1, ptr->nref());
{
UserDataUniquePtr temp_ptr;
ASSERT_EQ(1, UserData::AddressFailedAsWell(id, &temp_ptr));
ASSERT_EQ(ptr, temp_ptr);
ASSERT_EQ(2, ptr->nref());
}
ptr->Revive(1);
ASSERT_EQ(2, ptr->nref());
ASSERT_EQ(g_count, ptr->count);
ASSERT_FALSE(ptr->_additional_ref_released);
ASSERT_EQ(0, UserData::SetFailedById(id));
ASSERT_EQ(g_count - 1, ptr->count);
ptr.reset();
ASSERT_EQ(nullptr, ptr);
ASSERT_EQ(nullptr, g_user_data);
}
std::vector<int> err_fd;
pthread_mutex_t err_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
std::vector<int> rel_fd;
pthread_mutex_t rel_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
volatile bool client_stop = false;
struct BAIDU_CACHELINE_ALIGNMENT ClientMeta {
int fd;
size_t times;
size_t bytes;
};
struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public brpc::SocketUser {
char* buf;
size_t buf_cap;
size_t bytes;
size_t times;
SocketExtra() {
buf_cap = 32768;
buf = (char*)malloc(buf_cap);
bytes = 0;
times = 0;
}
void BeforeRecycle(brpc::Socket* m) override {
pthread_mutex_lock(&rel_fd_mutex);
rel_fd.push_back(m->fd());
pthread_mutex_unlock(&rel_fd_mutex);
delete this;
}
static int OnEdgeTriggeredEventOnce(brpc::Socket* m) {
SocketExtra* e = static_cast<SocketExtra*>(m->user());
// Read all data.
do {
ssize_t n = read(m->fd(), e->buf, e->buf_cap);
if (n == 0
#ifdef BRPC_SOCKET_HAS_EOF
|| m->_eof
#endif
) {
pthread_mutex_lock(&err_fd_mutex);
err_fd.push_back(m->fd());
pthread_mutex_unlock(&err_fd_mutex);
LOG(WARNING) << "Another end closed fd=" << m->fd();
return -1;
} else if (n > 0) {
e->bytes += n;
++e->times;
#ifdef BRPC_SOCKET_HAS_EOF
if ((size_t)n < e->buf_cap && brpc::has_epollrdhup) {
break;
}
#endif
} else {
if (errno == EAGAIN) {
break;
} else if (errno == EINTR) {
continue;
} else {
PLOG(WARNING) << "Fail to read fd=" << m->fd();
return -1;
}
}
} while (1);
return 0;
}
static void OnEdgeTriggeredEvents(brpc::Socket* m) {
int progress = brpc::Socket::PROGRESS_INIT;
do {
if (OnEdgeTriggeredEventOnce(m) != 0) {
m->SetFailed();
return;
}
} while (m->MoreReadEvents(&progress));
}
};
void* client_thread(void* arg) {
ClientMeta* m = (ClientMeta*)arg;
size_t offset = 0;
m->times = 0;
m->bytes = 0;
const size_t buf_cap = 32768;
char* buf = (char*)malloc(buf_cap);
for (size_t i = 0; i < buf_cap/8; ++i) {
((uint64_t*)buf)[i] = i;
}
while (!client_stop) {
ssize_t n;
if (offset == 0) {
n = write(m->fd, buf, buf_cap);
} else {
iovec v[2];
v[0].iov_base = buf + offset;
v[0].iov_len = buf_cap - offset;
v[1].iov_base = buf;
v[1].iov_len = offset;
n = writev(m->fd, v, 2);
}
if (n < 0) {
if (errno != EINTR) {
PLOG(WARNING) << "Fail to write fd=" << m->fd;
break;
}
} else {
++m->times;
m->bytes += n;
offset += n;
if (offset >= buf_cap) {
offset -= buf_cap;
}
}
}
EXPECT_EQ(0, close(m->fd));
return NULL;
}
inline uint32_t fmix32 ( uint32_t h ) {
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
TEST_F(EventDispatcherTest, dispatch_tasks) {
#ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM
const butil::ResourcePoolInfo old_info =
butil::describe_resources<brpc::Socket>();
#endif
client_stop = false;
const size_t NCLIENT = 16;
int fds[2 * NCLIENT];
pthread_t cth[NCLIENT];
ClientMeta* cm[NCLIENT];
SocketExtra* sm[NCLIENT];
for (size_t i = 0; i < NCLIENT; ++i) {
ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
sm[i] = new SocketExtra;
const int fd = fds[i * 2];
butil::make_non_blocking(fd);
brpc::SocketId socket_id;
brpc::SocketOptions options;
options.fd = fd;
options.user = sm[i];
options.on_edge_triggered_events = SocketExtra::OnEdgeTriggeredEvents;
ASSERT_EQ(0, brpc::Socket::Create(options, &socket_id));
cm[i] = new ClientMeta;
cm[i]->fd = fds[i * 2 + 1];
cm[i]->times = 0;
cm[i]->bytes = 0;
ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
}
LOG(INFO) << "Begin to profile... (5 seconds)";
ProfilerStart("event_dispatcher.prof");
butil::Timer tm;
tm.start();
sleep(5);
tm.stop();
ProfilerStop();
LOG(INFO) << "End profiling";
size_t client_bytes = 0;
size_t server_bytes = 0;
for (size_t i = 0; i < NCLIENT; ++i) {
client_bytes += cm[i]->bytes;
server_bytes += sm[i]->bytes;
}
LOG(INFO) << "client_tp=" << client_bytes / (double)tm.u_elapsed()
<< "MB/s server_tp=" << server_bytes / (double)tm.u_elapsed()
<< "MB/s";
client_stop = true;
for (size_t i = 0; i < NCLIENT; ++i) {
pthread_join(cth[i], NULL);
}
sleep(1);
std::vector<int> copy1, copy2;
pthread_mutex_lock(&err_fd_mutex);
copy1.swap(err_fd);
pthread_mutex_unlock(&err_fd_mutex);
pthread_mutex_lock(&rel_fd_mutex);
copy2.swap(rel_fd);
pthread_mutex_unlock(&rel_fd_mutex);
std::sort(copy1.begin(), copy1.end());
std::sort(copy2.begin(), copy2.end());
ASSERT_EQ(copy1.size(), copy2.size());
for (size_t i = 0; i < copy1.size(); ++i) {
ASSERT_EQ(copy1[i], copy2[i]) << i;
}
ASSERT_EQ(NCLIENT, copy1.size());
const butil::ResourcePoolInfo info
= butil::describe_resources<brpc::Socket>();
LOG(INFO) << info;
#ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM
ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num);
#endif
}
// Unique identifier of a EventPipe.
// Users shall store EventFDId instead of EventPipe and call EventPipe::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed EventPipe will not be recycled.
typedef brpc::VRefId EventPipeId;
const brpc::VRefId INVALID_EVENT_PIPE_ID = brpc::INVALID_VREF_ID;
class EventPipe;
typedef brpc::VersionedRefWithIdUniquePtr<EventPipe> EventPipeUniquePtr;
class EventPipe : public brpc::VersionedRefWithId<EventPipe> {
public:
explicit EventPipe(Forbidden f)
: brpc::VersionedRefWithId<EventPipe>(f)
, _pipe_fds{-1, -1}
, _input_event_count(0)
{}
int Notify() {
char c = 0;
if (write(_pipe_fds[1], &c, 1) != 1) {
PLOG(ERROR) << "Fail to write to _pipe_fds[1]";
return -1;
}
return 0;
}
private:
friend class VersionedRefWithId<EventPipe>;
friend class brpc::IOEvent<EventPipe>;
int OnCreated() {
if (pipe(_pipe_fds)) {
PLOG(FATAL) << "Fail to create _pipe_fds";
return -1;
}
if (_io_event.Init((void*)id()) != 0) {
LOG(ERROR) << "Fail to init IOEvent";
return -1;
}
_io_event.set_bthread_tag(bthread_self_tag());
if (_io_event.AddConsumer(_pipe_fds[0]) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
return -1;
}
_input_event_count = 0;
return 0;
}
void BeforeRecycled() {
brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag())
.RemoveConsumer(_pipe_fds[0]);
_io_event.Reset();
if (_pipe_fds[0] >= 0) {
close(_pipe_fds[0]);
}
if (_pipe_fds[1] >= 0) {
close(_pipe_fds[1]);
}
}
static int OnInputEvent(void* user_data, uint32_t,
const bthread_attr_t&) {
auto id = reinterpret_cast<EventPipeId>(user_data);
EventPipeUniquePtr ptr;
if (EventPipe::Address(id, &ptr) != 0) {
LOG(WARNING) << "Fail to address EventPipe";
return -1;
}
char buf[1024];
ssize_t nr = read(ptr->_pipe_fds[0], &buf, arraysize(buf));
if (nr <= 0) {
if (errno == EAGAIN) {
return 0;
} else {
PLOG(WARNING) << "Fail to read from _pipe_fds[0]";
ptr->SetFailed();
return -1;
}
}
ptr->_input_event_count += nr;
return 0;
}
static int OnOutputEvent(void*, uint32_t,
const bthread_attr_t&) {
EXPECT_TRUE(false) << "Should not be called";
return 0;
}
brpc::IOEvent<EventPipe> _io_event;
int _pipe_fds[2];
size_t _input_event_count;
};
TEST_F(EventDispatcherTest, customize_dispatch_task) {
EventPipeId id = INVALID_EVENT_PIPE_ID;
ASSERT_EQ(0, EventPipe::Create(&id));
ASSERT_NE(INVALID_EVENT_PIPE_ID, id);
EventPipeUniquePtr ptr;
ASSERT_EQ(0, EventPipe::Address(id, &ptr));
ASSERT_EQ(2, ptr->nref());
ASSERT_FALSE(ptr->Failed());
ASSERT_EQ((size_t)0, ptr->_input_event_count);
const size_t N = 10000;
for (size_t i = 0; i < N; ++i) {
ASSERT_EQ(0, ptr->Notify());
}
usleep(1000 * 50);
ASSERT_EQ(N, ptr->_input_event_count);
ASSERT_EQ(0, ptr->SetFailed());
ASSERT_TRUE(ptr->Failed());
ptr.reset();
ASSERT_EQ(nullptr, ptr);
ASSERT_NE(0, EventPipe::Address(id, &ptr));
}