| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // brpc - A framework to host and access services throughout Baidu. |
| |
| // Date: Sun Jul 13 15:04:18 CST 2014 |
| |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <fcntl.h> // F_GETFD |
| #include <gtest/gtest.h> |
| #include <gflags/gflags.h> |
| #include "butil/gperftools_profiler.h" |
| #include "butil/time.h" |
| #include "butil/macros.h" |
| #include "butil/fd_utility.h" |
| #include "bthread/unstable.h" |
| #include "bthread/task_control.h" |
| #include "brpc/socket.h" |
| #include "brpc/errno.pb.h" |
| #include "brpc/acceptor.h" |
| #include "brpc/policy/hulu_pbrpc_protocol.h" |
| #include "brpc/policy/most_common_message.h" |
| #include "brpc/policy/http_rpc_protocol.h" |
| #include "brpc/nshead.h" |
| #include "brpc/server.h" |
| #include "brpc/channel.h" |
| #include "brpc/controller.h" |
| #include "health_check.pb.h" |
| #if defined(OS_MACOSX) |
| #include <sys/event.h> |
| #endif |
| |
| #define CONNECT_IN_KEEPWRITE 1; |
| |
| namespace bthread { |
| extern TaskControl* g_task_control; |
| } |
| |
| namespace brpc { |
| DECLARE_int32(health_check_interval); |
| } |
| |
| void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base); |
| |
| int main(int argc, char* argv[]) { |
| testing::InitGoogleTest(&argc, argv); |
| GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); |
| brpc::Protocol dummy_protocol = |
| { brpc::policy::ParseHuluMessage, |
| brpc::SerializeRequestDefault, |
| brpc::policy::PackHuluRequest, |
| EchoProcessHuluRequest, EchoProcessHuluRequest, |
| NULL, NULL, NULL, |
| brpc::CONNECTION_TYPE_ALL, "dummy_hulu" }; |
| EXPECT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol)); |
| return RUN_ALL_TESTS(); |
| } |
| |
| struct WaitData { |
| bthread_id_t id; |
| int error_code; |
| std::string error_text; |
| |
| WaitData() : id(INVALID_BTHREAD_ID), error_code(0) {} |
| }; |
| int OnWaitIdReset(bthread_id_t id, void* data, int error_code, |
| const std::string& error_text) { |
| static_cast<WaitData*>(data)->id = id; |
| static_cast<WaitData*>(data)->error_code = error_code; |
| static_cast<WaitData*>(data)->error_text = error_text; |
| return bthread_id_unlock_and_destroy(id); |
| } |
| |
| class SocketTest : public ::testing::Test{ |
| protected: |
| SocketTest(){ |
| }; |
| virtual ~SocketTest(){}; |
| virtual void SetUp() { |
| }; |
| virtual void TearDown() { |
| }; |
| }; |
| |
| brpc::Socket* global_sock = NULL; |
| |
| class CheckRecycle : public brpc::SocketUser { |
| void BeforeRecycle(brpc::Socket* s) { |
| ASSERT_TRUE(global_sock); |
| ASSERT_EQ(global_sock, s); |
| global_sock = NULL; |
| delete this; |
| } |
| }; |
| |
| TEST_F(SocketTest, not_recycle_until_zero_nref) { |
| std::cout << "sizeof(Socket)=" << sizeof(brpc::Socket) << std::endl; |
| int fds[2]; |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); |
| brpc::SocketId id = 8888; |
| butil::EndPoint dummy; |
| ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy)); |
| brpc::SocketOptions options; |
| options.fd = fds[1]; |
| options.remote_side = dummy; |
| options.user = new CheckRecycle; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| { |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(fds[1], s->fd()); |
| ASSERT_EQ(dummy, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| ASSERT_EQ(0, s->SetFailed()); |
| ASSERT_EQ(s.get(), global_sock); |
| } |
| ASSERT_EQ((brpc::Socket*)NULL, global_sock); |
| close(fds[0]); |
| |
| brpc::SocketUniquePtr ptr; |
| ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr)); |
| } |
| |
| butil::atomic<int> winner_count(0); |
| const int AUTH_ERR = -9; |
| |
| void* auth_fighter(void* arg) { |
| bthread_usleep(10000); |
| int auth_error = 0; |
| brpc::Socket* s = (brpc::Socket*)arg; |
| if (s->FightAuthentication(&auth_error) == 0) { |
| winner_count.fetch_add(1); |
| s->SetAuthentication(AUTH_ERR); |
| } else { |
| EXPECT_EQ(AUTH_ERR, auth_error); |
| } |
| return NULL; |
| } |
| |
| TEST_F(SocketTest, authentication) { |
| brpc::SocketId id; |
| brpc::SocketOptions options; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| |
| bthread_t th[64]; |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| ASSERT_EQ(0, bthread_start_urgent(&th[i], NULL, auth_fighter, s.get())); |
| } |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| ASSERT_EQ(0, bthread_join(th[i], NULL)); |
| } |
| // Only one fighter wins |
| ASSERT_EQ(1, winner_count.load()); |
| |
| // Fight after signal is OK |
| int auth_error = 0; |
| ASSERT_NE(0, s->FightAuthentication(&auth_error)); |
| ASSERT_EQ(AUTH_ERR, auth_error); |
| // Socket has been `SetFailed' when authentication failed |
| ASSERT_TRUE(brpc::Socket::Address(s->id(), NULL)); |
| } |
| |
| static butil::atomic<int> g_called_seq(1); |
| class MyMessage : public brpc::SocketMessage { |
| public: |
| MyMessage(const char* str, size_t len, int* called = NULL) |
| : _str(str), _len(len), _called(called) {} |
| private: |
| butil::Status AppendAndDestroySelf(butil::IOBuf* out_buf, brpc::Socket*) { |
| out_buf->append(_str, _len); |
| if (_called) { |
| *_called = g_called_seq.fetch_add(1, butil::memory_order_relaxed); |
| } |
| delete this; |
| return butil::Status::OK(); |
| }; |
| const char* _str; |
| size_t _len; |
| int* _called; |
| }; |
| |
| class MyErrorMessage : public brpc::SocketMessage { |
| public: |
| explicit MyErrorMessage(const butil::Status& st) : _status(st) {} |
| private: |
| butil::Status AppendAndDestroySelf(butil::IOBuf*, brpc::Socket*) { |
| return _status; |
| }; |
| butil::Status _status; |
| }; |
| |
| TEST_F(SocketTest, single_threaded_write) { |
| int fds[2]; |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); |
| brpc::SocketId id = 8888; |
| butil::EndPoint dummy; |
| ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy)); |
| brpc::SocketOptions options; |
| options.fd = fds[1]; |
| options.remote_side = dummy; |
| options.user = new CheckRecycle; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| { |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(fds[1], s->fd()); |
| ASSERT_EQ(dummy, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| const int BATCH = 5; |
| for (size_t i = 0; i < 20; ++i) { |
| char buf[32 * BATCH]; |
| size_t len = snprintf(buf, sizeof(buf), "hello world! %lu", i); |
| if (i % 4 == 0) { |
| brpc::SocketMessagePtr<MyMessage> msg(new MyMessage(buf, len)); |
| ASSERT_EQ(0, s->Write(msg)); |
| } else if (i % 4 == 1) { |
| brpc::SocketMessagePtr<MyErrorMessage> msg( |
| new MyErrorMessage(butil::Status(EINVAL, "Invalid input"))); |
| bthread_id_t wait_id; |
| WaitData data; |
| ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset)); |
| brpc::Socket::WriteOptions wopt; |
| wopt.id_wait = wait_id; |
| ASSERT_EQ(0, s->Write(msg, &wopt)); |
| ASSERT_EQ(0, bthread_id_join(wait_id)); |
| ASSERT_EQ(wait_id.value, data.id.value); |
| ASSERT_EQ(EINVAL, data.error_code); |
| ASSERT_EQ("Invalid input", data.error_text); |
| continue; |
| } else if (i % 4 == 2) { |
| int seq[BATCH] = {}; |
| brpc::SocketMessagePtr<MyMessage> msgs[BATCH]; |
| // re-print the buffer. |
| len = 0; |
| for (int j = 0; j < BATCH; ++j) { |
| if (j % 2 == 0) { |
| // Empty message, should be skipped. |
| msgs[j].reset(new MyMessage(buf+len, 0, &seq[j])); |
| } else { |
| size_t sub_len = snprintf( |
| buf+len, sizeof(buf)-len, "hello world! %lu.%d", i, j); |
| msgs[j].reset(new MyMessage(buf+len, sub_len, &seq[j])); |
| len += sub_len; |
| } |
| } |
| for (size_t i = 0; i < BATCH; ++i) { |
| ASSERT_EQ(0, s->Write(msgs[i])); |
| } |
| for (int j = 1; j < BATCH; ++j) { |
| ASSERT_LT(seq[j-1], seq[j]) << "j=" << j; |
| } |
| } else { |
| butil::IOBuf src; |
| src.append(buf); |
| ASSERT_EQ(len, src.length()); |
| ASSERT_EQ(0, s->Write(&src)); |
| ASSERT_TRUE(src.empty()); |
| } |
| char dest[sizeof(buf)]; |
| ASSERT_EQ(len, (size_t)read(fds[0], dest, sizeof(dest))); |
| ASSERT_EQ(0, memcmp(buf, dest, len)); |
| } |
| ASSERT_EQ(0, s->SetFailed()); |
| } |
| ASSERT_EQ((brpc::Socket*)NULL, global_sock); |
| close(fds[0]); |
| } |
| |
| void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base) { |
| brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg( |
| static_cast<brpc::policy::MostCommonMessage*>(msg_base)); |
| butil::IOBuf buf; |
| buf.append(msg->meta); |
| buf.append(msg->payload); |
| ASSERT_EQ(0, msg->socket()->Write(&buf)); |
| } |
| |
| class MyConnect : public brpc::AppConnect { |
| public: |
| MyConnect() : _done(NULL), _data(NULL), _called_start_connect(false) {} |
| void StartConnect(const brpc::Socket*, |
| void (*done)(int err, void* data), |
| void* data) { |
| LOG(INFO) << "Start application-level connect"; |
| _done = done; |
| _data = data; |
| _called_start_connect = true; |
| } |
| void StopConnect(brpc::Socket*) { |
| LOG(INFO) << "Stop application-level connect"; |
| } |
| void MakeConnectDone() { |
| _done(0, _data); |
| } |
| bool is_start_connect_called() const { return _called_start_connect; } |
| private: |
| void (*_done)(int err, void* data); |
| void* _data; |
| bool _called_start_connect; |
| }; |
| |
| TEST_F(SocketTest, single_threaded_connect_and_write) { |
| // FIXME(gejun): Messenger has to be new otherwise quitting may crash. |
| brpc::Acceptor* messenger = new brpc::Acceptor; |
| const brpc::InputMessageHandler pairs[] = { |
| { brpc::policy::ParseHuluMessage, |
| EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" } |
| }; |
| |
| butil::EndPoint point(butil::IP_ANY, 7878); |
| int listening_fd = tcp_listen(point); |
| ASSERT_TRUE(listening_fd > 0); |
| butil::make_non_blocking(listening_fd); |
| ASSERT_EQ(0, messenger->AddHandler(pairs[0])); |
| ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL)); |
| |
| brpc::SocketId id = 8888; |
| brpc::SocketOptions options; |
| options.remote_side = point; |
| std::shared_ptr<MyConnect> my_connect = std::make_shared<MyConnect>(); |
| options.app_connect = my_connect; |
| options.user = new CheckRecycle; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| { |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(-1, s->fd()); |
| ASSERT_EQ(point, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| for (size_t i = 0; i < 20; ++i) { |
| char buf[64]; |
| const size_t meta_len = 4; |
| *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta"; |
| const size_t len = snprintf(buf + 12 + meta_len, |
| sizeof(buf) - 12 - meta_len, |
| "hello world! %lu", i); |
| memcpy(buf, "HULU", 4); |
| // HULU uses host byte order directly... |
| *(uint32_t*)(buf + 4) = len + meta_len; |
| *(uint32_t*)(buf + 8) = meta_len; |
| |
| int called = 0; |
| if (i % 2 == 0) { |
| brpc::SocketMessagePtr<MyMessage> msg( |
| new MyMessage(buf, 12 + meta_len + len, &called)); |
| ASSERT_EQ(0, s->Write(msg)); |
| } else { |
| butil::IOBuf src; |
| src.append(buf, 12 + meta_len + len); |
| ASSERT_EQ(12 + meta_len + len, src.length()); |
| ASSERT_EQ(0, s->Write(&src)); |
| ASSERT_TRUE(src.empty()); |
| } |
| if (i == 0) { |
| // connection needs to be established at first time. |
| // Should be intentionally blocked in app_connect. |
| bthread_usleep(10000); |
| ASSERT_TRUE(my_connect->is_start_connect_called()); |
| ASSERT_LT(0, s->fd()); // already tcp connected |
| ASSERT_EQ(0, called); // request is not serialized yet. |
| my_connect->MakeConnectDone(); |
| ASSERT_LT(0, called); // serialized |
| } |
| int64_t start_time = butil::gettimeofday_us(); |
| while (s->fd() < 0) { |
| bthread_usleep(1000); |
| ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!"; |
| } |
| #if defined(OS_LINUX) |
| ASSERT_EQ(0, bthread_fd_wait(s->fd(), EPOLLIN)); |
| #elif defined(OS_MACOSX) |
| ASSERT_EQ(0, bthread_fd_wait(s->fd(), EVFILT_READ)); |
| #endif |
| char dest[sizeof(buf)]; |
| ASSERT_EQ(meta_len + len, (size_t)read(s->fd(), dest, sizeof(dest))); |
| ASSERT_EQ(0, memcmp(buf + 12, dest, meta_len + len)); |
| } |
| ASSERT_EQ(0, s->SetFailed()); |
| } |
| ASSERT_EQ((brpc::Socket*)NULL, global_sock); |
| // The id is invalid. |
| brpc::SocketUniquePtr ptr; |
| ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr)); |
| |
| messenger->StopAccept(0); |
| ASSERT_EQ(-1, messenger->listened_fd()); |
| ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); |
| ASSERT_EQ(EBADF, errno); |
| } |
| |
| #define NUMBER_WIDTH 16 |
| |
| struct WriterArg { |
| size_t times; |
| size_t offset; |
| brpc::SocketId socket_id; |
| }; |
| |
| void* FailedWriter(void* void_arg) { |
| WriterArg* arg = static_cast<WriterArg*>(void_arg); |
| brpc::SocketUniquePtr sock; |
| if (brpc::Socket::Address(arg->socket_id, &sock) < 0) { |
| printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id); |
| return NULL; |
| } |
| char buf[32]; |
| for (size_t i = 0; i < arg->times; ++i) { |
| bthread_id_t id; |
| EXPECT_EQ(0, bthread_id_create(&id, NULL, NULL)); |
| snprintf(buf, sizeof(buf), "%0" BAIDU_SYMBOLSTR(NUMBER_WIDTH) "lu", |
| i + arg->offset); |
| butil::IOBuf src; |
| src.append(buf); |
| brpc::Socket::WriteOptions wopt; |
| wopt.id_wait = id; |
| sock->Write(&src, &wopt); |
| EXPECT_EQ(0, bthread_id_join(id)); |
| // Only the first connect can see ECONNREFUSED and then |
| // calls `SetFailed' making others' error_code=EINVAL |
| //EXPECT_EQ(ECONNREFUSED, error_code); |
| } |
| return NULL; |
| } |
| |
| TEST_F(SocketTest, fail_to_connect) { |
| const size_t REP = 10; |
| butil::EndPoint point(butil::IP_ANY, 7563/*not listened*/); |
| brpc::SocketId id = 8888; |
| brpc::SocketOptions options; |
| options.remote_side = point; |
| options.user = new CheckRecycle; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| { |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(-1, s->fd()); |
| ASSERT_EQ(point, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| pthread_t th[8]; |
| WriterArg args[ARRAY_SIZE(th)]; |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| args[i].times = REP; |
| args[i].offset = i * REP; |
| args[i].socket_id = id; |
| ASSERT_EQ(0, pthread_create(&th[i], NULL, FailedWriter, &args[i])); |
| } |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| ASSERT_EQ(0, pthread_join(th[i], NULL)); |
| } |
| ASSERT_EQ(-1, s->SetFailed()); // already SetFailed |
| ASSERT_EQ(-1, s->fd()); |
| } |
| // KeepWrite is possibly still running. |
| int64_t start_time = butil::gettimeofday_us(); |
| while (global_sock != NULL) { |
| bthread_usleep(1000); |
| ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!"; |
| } |
| ASSERT_EQ(-1, brpc::Socket::Status(id)); |
| // The id is invalid. |
| brpc::SocketUniquePtr ptr; |
| ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr)); |
| } |
| |
| TEST_F(SocketTest, not_health_check_when_nref_hits_0) { |
| brpc::SocketId id = 8888; |
| butil::EndPoint point(butil::IP_ANY, 7584/*not listened*/); |
| brpc::SocketOptions options; |
| options.remote_side = point; |
| options.user = new CheckRecycle; |
| options.health_check_interval_s = 1/*s*/; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| { |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| s->SetHCRelatedRefHeld(); // set held status |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(-1, s->fd()); |
| ASSERT_EQ(point, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| |
| char buf[64]; |
| const size_t meta_len = 4; |
| *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta"; |
| const size_t len = snprintf(buf + 12 + meta_len, |
| sizeof(buf) - 12 - meta_len, |
| "hello world!"); |
| memcpy(buf, "HULU", 4); |
| // HULU uses host byte order directly... |
| *(uint32_t*)(buf + 4) = len + meta_len; |
| *(uint32_t*)(buf + 8) = meta_len; |
| butil::IOBuf src; |
| src.append(buf, 12 + meta_len + len); |
| ASSERT_EQ(12 + meta_len + len, src.length()); |
| #ifdef CONNECT_IN_KEEPWRITE |
| bthread_id_t wait_id; |
| WaitData data; |
| ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset)); |
| brpc::Socket::WriteOptions wopt; |
| wopt.id_wait = wait_id; |
| ASSERT_EQ(0, s->Write(&src, &wopt)); |
| ASSERT_EQ(0, bthread_id_join(wait_id)); |
| ASSERT_EQ(wait_id.value, data.id.value); |
| ASSERT_EQ(ECONNREFUSED, data.error_code); |
| ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with( |
| "Fail to connect ")); |
| #else |
| ASSERT_EQ(-1, s->Write(&src)); |
| ASSERT_EQ(ECONNREFUSED, errno); |
| #endif |
| ASSERT_TRUE(src.empty()); |
| ASSERT_EQ(-1, s->fd()); |
| } |
| // HealthCheckThread is possibly still running. Spin until global_sock |
| // is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should |
| // not spin until Socket::Status(id) becomes -1 and assert global_sock |
| // to be NULL because invalidating id happens before calling BeforeRecycle. |
| const int64_t start_time = butil::gettimeofday_us(); |
| while (global_sock != NULL) { |
| bthread_usleep(1000); |
| ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L); |
| } |
| ASSERT_EQ(-1, brpc::Socket::Status(id)); |
| } |
| |
| class HealthCheckTestServiceImpl : public test::HealthCheckTestService { |
| public: |
| HealthCheckTestServiceImpl() |
| : _sleep_flag(true) {} |
| virtual ~HealthCheckTestServiceImpl() {} |
| |
| virtual void default_method(google::protobuf::RpcController* cntl_base, |
| const test::HealthCheckRequest* request, |
| test::HealthCheckResponse* response, |
| google::protobuf::Closure* done) { |
| brpc::ClosureGuard done_guard(done); |
| brpc::Controller* cntl = (brpc::Controller*)cntl_base; |
| if (_sleep_flag) { |
| bthread_usleep(510000 /* 510ms, a little bit longer than the default |
| timeout of health check rpc */); |
| } |
| cntl->response_attachment().append("OK"); |
| } |
| |
| bool _sleep_flag; |
| }; |
| |
| TEST_F(SocketTest, app_level_health_check) { |
| int old_health_check_interval = brpc::FLAGS_health_check_interval; |
| GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService"); |
| GFLAGS_NS::SetCommandLineOption("health_check_interval", "1"); |
| |
| butil::EndPoint point(butil::IP_ANY, 7777); |
| brpc::ChannelOptions options; |
| options.protocol = "http"; |
| options.max_retry = 0; |
| brpc::Channel channel; |
| ASSERT_EQ(0, channel.Init(point, &options)); |
| { |
| brpc::Controller cntl; |
| cntl.http_request().uri() = "/"; |
| channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); |
| EXPECT_TRUE(cntl.Failed()); |
| ASSERT_EQ(ECONNREFUSED, cntl.ErrorCode()); |
| } |
| |
| // 2s to make sure remote is connected by HealthCheckTask and enter the |
| // sending-rpc state. Because the remote is not down, so hc rpc would keep |
| // sending. |
| int listening_fd = tcp_listen(point); |
| bthread_usleep(2000000); |
| |
| // 2s to make sure HealthCheckTask find socket is failed and correct impl |
| // should trigger next round of hc |
| close(listening_fd); |
| bthread_usleep(2000000); |
| |
| brpc::Server server; |
| HealthCheckTestServiceImpl hc_service; |
| ASSERT_EQ(0, server.AddService(&hc_service, brpc::SERVER_DOESNT_OWN_SERVICE)); |
| ASSERT_EQ(0, server.Start(point, NULL)); |
| |
| for (int i = 0; i < 4; ++i) { |
| // although ::connect would succeed, the stall in hc_service makes |
| // the health check rpc fail. |
| brpc::Controller cntl; |
| cntl.http_request().uri() = "/"; |
| channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); |
| ASSERT_EQ(EHOSTDOWN, cntl.ErrorCode()); |
| bthread_usleep(1000000 /*1s*/); |
| } |
| hc_service._sleep_flag = false; |
| bthread_usleep(2000000 /* a little bit longer than hc rpc timeout + hc interval */); |
| // should recover now |
| { |
| brpc::Controller cntl; |
| cntl.http_request().uri() = "/"; |
| channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); |
| ASSERT_FALSE(cntl.Failed()); |
| ASSERT_GT(cntl.response_attachment().size(), (size_t)0); |
| } |
| |
| GFLAGS_NS::SetCommandLineOption("health_check_path", ""); |
| char hc_buf[8]; |
| snprintf(hc_buf, sizeof(hc_buf), "%d", old_health_check_interval); |
| GFLAGS_NS::SetCommandLineOption("health_check_interval", hc_buf); |
| } |
| |
| TEST_F(SocketTest, health_check) { |
| // FIXME(gejun): Messenger has to be new otherwise quitting may crash. |
| brpc::Acceptor* messenger = new brpc::Acceptor; |
| |
| brpc::SocketId id = 8888; |
| butil::EndPoint point(butil::IP_ANY, 7878); |
| const int kCheckInteval = 1; |
| brpc::SocketOptions options; |
| options.remote_side = point; |
| options.user = new CheckRecycle; |
| options.health_check_interval_s = kCheckInteval/*s*/; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| |
| s->SetHCRelatedRefHeld(); // set held status |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(-1, s->fd()); |
| ASSERT_EQ(point, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| int32_t nref = -1; |
| ASSERT_EQ(0, brpc::Socket::Status(id, &nref)); |
| ASSERT_EQ(2, nref); |
| |
| char buf[64]; |
| const size_t meta_len = 4; |
| *(uint32_t*)(buf + 12) = *(uint32_t*)"Meta"; |
| const size_t len = snprintf(buf + 12 + meta_len, |
| sizeof(buf) - 12 - meta_len, |
| "hello world!"); |
| memcpy(buf, "HULU", 4); |
| // HULU uses host byte order directly... |
| *(uint32_t*)(buf + 4) = len + meta_len; |
| *(uint32_t*)(buf + 8) = meta_len; |
| const bool use_my_message = (butil::fast_rand_less_than(2) == 0); |
| brpc::SocketMessagePtr<MyMessage> msg; |
| int appended_msg = 0; |
| butil::IOBuf src; |
| if (use_my_message) { |
| LOG(INFO) << "Use MyMessage"; |
| msg.reset(new MyMessage(buf, 12 + meta_len + len, &appended_msg)); |
| } else { |
| src.append(buf, 12 + meta_len + len); |
| ASSERT_EQ(12 + meta_len + len, src.length()); |
| } |
| #ifdef CONNECT_IN_KEEPWRITE |
| bthread_id_t wait_id; |
| WaitData data; |
| ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset)); |
| brpc::Socket::WriteOptions wopt; |
| wopt.id_wait = wait_id; |
| if (use_my_message) { |
| ASSERT_EQ(0, s->Write(msg, &wopt)); |
| } else { |
| ASSERT_EQ(0, s->Write(&src, &wopt)); |
| } |
| ASSERT_EQ(0, bthread_id_join(wait_id)); |
| ASSERT_EQ(wait_id.value, data.id.value); |
| ASSERT_EQ(ECONNREFUSED, data.error_code); |
| ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with( |
| "Fail to connect ")); |
| if (use_my_message) { |
| ASSERT_TRUE(appended_msg); |
| } |
| #else |
| if (use_my_message) { |
| ASSERT_EQ(-1, s->Write(msg)); |
| } else { |
| ASSERT_EQ(-1, s->Write(&src)); |
| } |
| ASSERT_EQ(ECONNREFUSED, errno); |
| #endif |
| ASSERT_TRUE(src.empty()); |
| ASSERT_EQ(-1, s->fd()); |
| ASSERT_TRUE(global_sock); |
| brpc::SocketUniquePtr invalid_ptr; |
| ASSERT_EQ(-1, brpc::Socket::Address(id, &invalid_ptr)); |
| ASSERT_EQ(1, brpc::Socket::Status(id)); |
| |
| const brpc::InputMessageHandler pairs[] = { |
| { brpc::policy::ParseHuluMessage, |
| EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" } |
| }; |
| |
| int listening_fd = tcp_listen(point); |
| ASSERT_TRUE(listening_fd > 0); |
| butil::make_non_blocking(listening_fd); |
| ASSERT_EQ(0, messenger->AddHandler(pairs[0])); |
| ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL)); |
| |
| int64_t start_time = butil::gettimeofday_us(); |
| nref = -1; |
| while (brpc::Socket::Status(id, &nref) != 0) { |
| bthread_usleep(1000); |
| ASSERT_LT(butil::gettimeofday_us(), |
| start_time + kCheckInteval * 1000000L + 100000L/*100ms*/); |
| } |
| //ASSERT_EQ(2, nref); |
| ASSERT_TRUE(global_sock); |
| |
| int fd = 0; |
| { |
| brpc::SocketUniquePtr ptr; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); |
| ASSERT_NE(0, ptr->fd()); |
| fd = ptr->fd(); |
| } |
| |
| // SetFailed again, should reconnect and succeed soon. |
| ASSERT_EQ(0, s->SetFailed()); |
| ASSERT_EQ(fd, s->fd()); |
| start_time = butil::gettimeofday_us(); |
| while (brpc::Socket::Status(id) != 0) { |
| bthread_usleep(1000); |
| ASSERT_LT(butil::gettimeofday_us(), start_time + 1200000L); |
| } |
| ASSERT_TRUE(global_sock); |
| |
| { |
| brpc::SocketUniquePtr ptr; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); |
| ASSERT_NE(0, ptr->fd()); |
| } |
| |
| s.release()->Dereference(); |
| |
| // Must stop messenger before SetFailed the id otherwise HealthCheckThread |
| // still has chance to get reconnected and revive the id. |
| messenger->StopAccept(0); |
| ASSERT_EQ(-1, messenger->listened_fd()); |
| ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); |
| ASSERT_EQ(EBADF, errno); |
| |
| ASSERT_EQ(0, brpc::Socket::SetFailed(id)); |
| // HealthCheckThread is possibly still addressing the Socket. |
| start_time = butil::gettimeofday_us(); |
| while (global_sock != NULL) { |
| bthread_usleep(1000); |
| ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L); |
| } |
| ASSERT_EQ(-1, brpc::Socket::Status(id)); |
| // The id is invalid. |
| brpc::SocketUniquePtr ptr; |
| ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr)); |
| } |
| |
| void* Writer(void* void_arg) { |
| WriterArg* arg = static_cast<WriterArg*>(void_arg); |
| brpc::SocketUniquePtr sock; |
| if (brpc::Socket::Address(arg->socket_id, &sock) < 0) { |
| printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id); |
| return NULL; |
| } |
| char buf[32]; |
| for (size_t i = 0; i < arg->times; ++i) { |
| snprintf(buf, sizeof(buf), "%0" BAIDU_SYMBOLSTR(NUMBER_WIDTH) "lu", |
| i + arg->offset); |
| butil::IOBuf src; |
| src.append(buf); |
| if (sock->Write(&src) != 0) { |
| if (errno == brpc::EOVERCROWDED) { |
| // The buf is full, sleep a while and retry. |
| bthread_usleep(1000); |
| --i; |
| continue; |
| } |
| printf("Fail to write into SocketId=%" PRIu64 ", %s\n", |
| arg->socket_id, berror()); |
| break; |
| } |
| } |
| return NULL; |
| } |
| |
| TEST_F(SocketTest, multi_threaded_write) { |
| const size_t REP = 20000; |
| int fds[2]; |
| for (int k = 0; k < 2; ++k) { |
| printf("Round %d\n", k + 1); |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); |
| pthread_t th[8]; |
| WriterArg args[ARRAY_SIZE(th)]; |
| std::vector<size_t> result; |
| result.reserve(ARRAY_SIZE(th) * REP); |
| |
| brpc::SocketId id = 8888; |
| butil::EndPoint dummy; |
| ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy)); |
| brpc::SocketOptions options; |
| options.fd = fds[1]; |
| options.remote_side = dummy; |
| options.user = new CheckRecycle; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| s->_ssl_state = brpc::SSL_OFF; |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(fds[1], s->fd()); |
| ASSERT_EQ(dummy, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| butil::make_non_blocking(fds[0]); |
| |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| args[i].times = REP; |
| args[i].offset = i * REP; |
| args[i].socket_id = id; |
| ASSERT_EQ(0, pthread_create(&th[i], NULL, Writer, &args[i])); |
| } |
| |
| if (k == 1) { |
| printf("sleep 100ms to block writers\n"); |
| bthread_usleep(100000); |
| } |
| |
| butil::IOPortal dest; |
| const int64_t start_time = butil::gettimeofday_us(); |
| for (;;) { |
| ssize_t nr = dest.append_from_file_descriptor(fds[0], 32768); |
| if (nr < 0) { |
| if (errno == EINTR) { |
| continue; |
| } |
| if (EAGAIN != errno) { |
| ASSERT_EQ(EAGAIN, errno) << berror(); |
| } |
| bthread_usleep(1000); |
| if (butil::gettimeofday_us() >= start_time + 2000000L) { |
| LOG(FATAL) << "Wait too long!"; |
| break; |
| } |
| continue; |
| } |
| while (dest.length() >= NUMBER_WIDTH) { |
| char buf[NUMBER_WIDTH + 1]; |
| dest.copy_to(buf, NUMBER_WIDTH); |
| buf[sizeof(buf)-1] = 0; |
| result.push_back(strtol(buf, NULL, 10)); |
| dest.pop_front(NUMBER_WIDTH); |
| } |
| if (result.size() >= REP * ARRAY_SIZE(th)) { |
| break; |
| } |
| } |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| ASSERT_EQ(0, pthread_join(th[i], NULL)); |
| } |
| ASSERT_TRUE(dest.empty()); |
| bthread::g_task_control->print_rq_sizes(std::cout); |
| std::cout << std::endl; |
| |
| ASSERT_EQ(REP * ARRAY_SIZE(th), result.size()) |
| << "write_head=" << s->_write_head; |
| std::sort(result.begin(), result.end()); |
| result.resize(std::unique(result.begin(), |
| result.end()) - result.begin()); |
| ASSERT_EQ(REP * ARRAY_SIZE(th), result.size()); |
| ASSERT_EQ(0UL, *result.begin()); |
| ASSERT_EQ(REP * ARRAY_SIZE(th) - 1, *(result.end() - 1)); |
| |
| ASSERT_EQ(0, s->SetFailed()); |
| s.release()->Dereference(); |
| ASSERT_EQ((brpc::Socket*)NULL, global_sock); |
| close(fds[0]); |
| } |
| } |
| |
| void* FastWriter(void* void_arg) { |
| WriterArg* arg = static_cast<WriterArg*>(void_arg); |
| brpc::SocketUniquePtr sock; |
| if (brpc::Socket::Address(arg->socket_id, &sock) < 0) { |
| printf("Fail to address SocketId=%" PRIu64 "\n", arg->socket_id); |
| return NULL; |
| } |
| char buf[] = "hello reader side!"; |
| int64_t begin_ts = butil::cpuwide_time_us(); |
| int64_t nretry = 0; |
| size_t c = 0; |
| for (; c < arg->times; ++c) { |
| butil::IOBuf src; |
| src.append(buf, 16); |
| if (sock->Write(&src) != 0) { |
| if (errno == brpc::EOVERCROWDED) { |
| // The buf is full, sleep a while and retry. |
| bthread_usleep(1000); |
| --c; |
| ++nretry; |
| continue; |
| } |
| printf("Fail to write into SocketId=%" PRIu64 ", %s\n", |
| arg->socket_id, berror()); |
| break; |
| } |
| } |
| int64_t end_ts = butil::cpuwide_time_us(); |
| int64_t total_time = end_ts - begin_ts; |
| printf("total=%ld count=%ld nretry=%ld\n", |
| (long)total_time * 1000/ c, (long)c, (long)nretry); |
| return NULL; |
| } |
| |
| struct ReaderArg { |
| int fd; |
| size_t nread; |
| }; |
| |
| void* reader(void* void_arg) { |
| ReaderArg* arg = static_cast<ReaderArg*>(void_arg); |
| const size_t LEN = 32768; |
| char* buf = (char*)malloc(LEN); |
| while (1) { |
| ssize_t nr = read(arg->fd, buf, LEN); |
| if (nr < 0) { |
| printf("Fail to read, %m\n"); |
| return NULL; |
| } else if (nr == 0) { |
| printf("Far end closed\n"); |
| return NULL; |
| } |
| arg->nread += nr; |
| } |
| return NULL; |
| } |
| |
| TEST_F(SocketTest, multi_threaded_write_perf) { |
| const size_t REP = 1000000000; |
| int fds[2]; |
| ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); |
| bthread_t th[3]; |
| WriterArg args[ARRAY_SIZE(th)]; |
| |
| brpc::SocketId id = 8888; |
| butil::EndPoint dummy; |
| ASSERT_EQ(0, str2endpoint("192.168.1.26:8080", &dummy)); |
| brpc::SocketOptions options; |
| options.fd = fds[1]; |
| options.remote_side = dummy; |
| options.user = new CheckRecycle; |
| ASSERT_EQ(0, brpc::Socket::Create(options, &id)); |
| brpc::SocketUniquePtr s; |
| ASSERT_EQ(0, brpc::Socket::Address(id, &s)); |
| s->_ssl_state = brpc::SSL_OFF; |
| ASSERT_EQ(2, brpc::NRefOfVRef(s->_versioned_ref)); |
| global_sock = s.get(); |
| ASSERT_TRUE(s.get()); |
| ASSERT_EQ(fds[1], s->fd()); |
| ASSERT_EQ(dummy, s->remote_side()); |
| ASSERT_EQ(id, s->id()); |
| |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| args[i].times = REP; |
| args[i].offset = i * REP; |
| args[i].socket_id = id; |
| bthread_start_background(&th[i], NULL, FastWriter, &args[i]); |
| } |
| |
| pthread_t rth; |
| ReaderArg reader_arg = { fds[0], 0 }; |
| pthread_create(&rth, NULL, reader, &reader_arg); |
| |
| butil::Timer tm; |
| ProfilerStart("write.prof"); |
| const uint64_t old_nread = reader_arg.nread; |
| tm.start(); |
| sleep(2); |
| tm.stop(); |
| const uint64_t new_nread = reader_arg.nread; |
| ProfilerStop(); |
| |
| printf("tp=%" PRIu64 "M/s\n", (new_nread - old_nread) / tm.u_elapsed()); |
| |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| args[i].times = 0; |
| } |
| for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { |
| ASSERT_EQ(0, bthread_join(th[i], NULL)); |
| } |
| ASSERT_EQ(0, s->SetFailed()); |
| s.release()->Dereference(); |
| pthread_join(rth, NULL); |
| ASSERT_EQ((brpc::Socket*)NULL, global_sock); |
| close(fds[0]); |
| } |