blob: 26a82361161ee16e29598bf4f882d639ea2a7b42 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <netinet/in.h>
#include <sys/socket.h>
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#if BRPC_WITH_RDMA
#include <google/protobuf/descriptor.h>
#include "butil/endpoint.h"
#include "butil/fd_guard.h"
#include "butil/fd_utility.h"
#include "butil/iobuf.h"
#include "butil/sys_byteorder.h"
#include "butil/files/temp_file.h"
#include "brpc/acceptor.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
#include "brpc/socket.h"
#include "brpc/errno.pb.h"
#include "brpc/parallel_channel.h"
#include "brpc/selective_channel.h"
#include "brpc/rdma/block_pool.h"
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/rdma/rdma_helper.h"
#include "echo.pb.h"
static const int PORT = 8713;
static const size_t RDMA_HELLO_MSG_LEN = 40;
using namespace brpc;
namespace brpc {
DECLARE_int64(socket_max_unwritten_bytes);
DECLARE_bool(log_idle_connection_close);
DEFINE_bool(rdma_test_enable, false, "Enable tests requring rdma runtime.");
namespace rdma {
struct HelloMessage {
void Serialize(void* data) const;
void Deserialize(void* data);
uint16_t msg_len;
uint16_t hello_ver;
uint16_t impl_ver;
uint16_t block_size;
uint16_t sq_size;
uint16_t rq_size;
ibv_gid gid;
uint32_t qp_num;
};
DECLARE_bool(rdma_trace_verbose);
DECLARE_int32(rdma_memory_pool_max_regions);
extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
extern int (*IbvDestroyCq)(ibv_cq*);
extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*);
extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask);
extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*);
extern int (*IbvDestroyQp)(ibv_qp*);
extern butil::atomic<bool> g_rdma_available;
extern bool g_skip_rdma_init;
}
}
static std::string g_ip = "127.0.0.1";
static butil::EndPoint g_ep;
class MyEchoService : public ::test::EchoService {
void Echo(google::protobuf::RpcController* cntl_base,
const ::test::EchoRequest* req,
::test::EchoResponse* res,
google::protobuf::Closure* done) {
Controller* cntl = static_cast<Controller*>(cntl_base);
ClosureGuard done_guard(done);
if (req->server_fail()) {
cntl->SetFailed(req->server_fail(), "Server fail1");
cntl->SetFailed(req->server_fail(), "Server fail2");
return;
}
if (req->close_fd()) {
usleep(1);
LOG(INFO) << "close fd...";
cntl->CloseConnection("Close connection according to request");
return;
}
if (req->sleep_us() > 0) {
LOG(INFO) << "sleep " << req->sleep_us() << "us...";
bthread_usleep(req->sleep_us());
}
res->set_message(req->message());
if (req->code() != 0) {
res->add_code_list(req->code());
}
cntl->response_attachment().append(cntl->request_attachment());
}
};
class RdmaTest : public ::testing::Test {
protected:
RdmaTest() {
butil::ip_t ip;
EXPECT_EQ(0, butil::str2ip(g_ip.c_str(), &ip));
butil::EndPoint ep(ip, PORT);
g_ep = ep;
EXPECT_EQ(0, _server_list.save(butil::endpoint2str(g_ep).c_str()));
_naming_url = std::string("File://") + _server_list.fname();
_server.AddService(&_svc, SERVER_DOESNT_OWN_SERVICE);
}
~RdmaTest() { }
virtual void SetUp() { }
virtual void TearDown() {
rdma::DumpMemoryPoolInfo(std::cout);
}
private:
void StartServer(bool use_rdma = true) {
ServerOptions options;
options.use_rdma = use_rdma;
options.idle_timeout_sec = 5;
options.max_concurrency = 0;
options.internal_port = -1;
EXPECT_EQ(0, _server.Start(PORT, &options));
}
void StopServer() {
_server.Stop(0);
_server.Join();
}
Socket* GetSocketFromServer(size_t index) {
std::vector<SocketId> sids;
_server._am->ListConnections(&sids);
if (index >= sids.size()) {
return NULL;
}
SocketUniquePtr s;
if (Socket::Address(sids[index], &s) == 0) {
return s.get();
}
return NULL;
}
butil::TempFile _server_list;
std::string _naming_url;
Server _server;
MyEchoService _svc;
};
TEST_F(RdmaTest, client_close_before_hello_send) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd >= 0);
ASSERT_EQ(0, connect(sockfd, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
Socket* s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
close(sockfd);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_hello_msg_invalid_magic_str) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd >= 0);
ASSERT_EQ(0, connect(sockfd, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
Socket* s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
uint8_t data[RDMA_HELLO_MSG_LEN];
memcpy(data, "PRPC", 4); // send as normal baidu_std protocol
memset(data + 4, 0, 32);
ASSERT_EQ(38, write(sockfd, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
StopServer();
}
TEST_F(RdmaTest, client_close_during_hello_send) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
uint8_t data[8];
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RD", 2);
ASSERT_EQ(2, write(sockfd1, data, 2)); // break in magic str
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_HELLO_WAIT, s->_rdma_ep->_state);
close(sockfd1);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd2 >= 0);
ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RDMA", 4);
ASSERT_EQ(4, write(sockfd2, data, 4)); // break after magic str
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_HELLO_WAIT, s->_rdma_ep->_state);
close(sockfd2);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd3(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd3 >= 0);
ASSERT_EQ(0, connect(sockfd3, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RDMA", 4);
memset(data + 4, 0, 4);
ASSERT_EQ(8, write(sockfd3, data, 8)); // break after magic str
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_HELLO_WAIT, s->_rdma_ep->_state);
close(sockfd3);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_hello_msg_invalid_len) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
uint8_t data[RDMA_HELLO_MSG_LEN];
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RDMA", 4);
memset(data + 4, 0, 34);
ASSERT_EQ(38, write(sockfd1, data, 38)); // write invalid length
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd2 >= 0);
ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RDMA", 4);
uint16_t len = butil::HostToNet16(35);
memcpy(data + 4, &len, 2);
memset(data + 6, 0, 32);
ASSERT_EQ(38, write(sockfd2, data, 38)); // write invalid length
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_hello_msg_invalid_version) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
uint8_t data[RDMA_HELLO_MSG_LEN];
uint16_t len = butil::HostToNet16(38);
uint16_t ver = butil::HostToNet16(1);
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RDMA", 4);
memcpy(data + 4, &len, 2);
memset(data + 6, 0, 32);
memcpy(data + 6, &ver, 2); // hello_ver == 1, impl_ver == 0
ASSERT_EQ(38, write(sockfd1, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
memset(data, 0, 4);
ASSERT_EQ(4, write(sockfd1, data, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
close(sockfd1);
usleep(100000);
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd2 >= 0);
ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
memcpy(data, "RDMA", 4);
memcpy(data + 4, &len, 2);
memset(data + 6, 0, 32);
memcpy(data + 8, &ver, 2); // hello_ver == 0, impl_ver == 1
ASSERT_EQ(38, write(sockfd2, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
uint32_t flag = butil::HostToNet32(1);
ASSERT_EQ(4, write(sockfd2, &flag, 4));
usleep(100000);
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_hello_msg_invalid_sq_rq_block_size) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
rdma::HelloMessage msg;
uint8_t data[RDMA_HELLO_MSG_LEN];
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 10;
msg.rq_size = 16;
msg.block_size = 8192;
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd1, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
memset(data, 0, 4);
ASSERT_EQ(4, write(sockfd1, data, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
close(sockfd1);
msg.sq_size = 16;
msg.rq_size = 10;
msg.block_size = 8192;
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd2 >= 0);
ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd2, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
memset(data, 0, 4);
ASSERT_EQ(4, write(sockfd1, data, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
close(sockfd2);
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 1000;
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd3(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd3 >= 0);
ASSERT_EQ(0, connect(sockfd3, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd3, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
memset(data, 0, 4);
ASSERT_EQ(4, write(sockfd3, data, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
StopServer();
}
TEST_F(RdmaTest, client_close_after_qp_build) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
rdma::HelloMessage msg;
uint8_t data[RDMA_HELLO_MSG_LEN];
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd1, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
close(sockfd1);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_close_during_ack_send) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
rdma::HelloMessage msg;
uint8_t data[RDMA_HELLO_MSG_LEN];
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd1, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
uint32_t flags = butil::HostToNet32(0);
ASSERT_EQ(2, write(sockfd1, &flags, 2));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
close(sockfd1);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_close_after_ack_send) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
rdma::HelloMessage msg;
uint8_t data[RDMA_HELLO_MSG_LEN];
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd1, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
uint32_t flags = butil::HostToNet32(0);
ASSERT_EQ(4, write(sockfd1, &flags, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
close(sockfd1);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd2 >= 0);
ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd2, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
flags = butil::HostToNet32(1);
ASSERT_EQ(4, write(sockfd2, &flags, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state);
close(sockfd2);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) {
StartServer();
sockaddr_in addr;
bzero((char*)&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
Socket* s = NULL;
rdma::HelloMessage msg;
uint8_t data[RDMA_HELLO_MSG_LEN];
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd1 >= 0);
ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd1, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
uint32_t flags = butil::HostToNet32(0);
ASSERT_EQ(4, write(sockfd1, &flags, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
ASSERT_EQ(4, write(sockfd1, &flags, 4));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
close(sockfd1);
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
ASSERT_TRUE(sockfd2 >= 0);
ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr)));
usleep(100000); // wait for server to handle the msg
s = GetSocketFromServer(0);
ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state);
ASSERT_EQ(38, write(sockfd2, data, 38));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state);
flags = butil::HostToNet32(1);
ASSERT_EQ(4, write(sockfd2, &flags, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state);
ASSERT_EQ(4, write(sockfd1, &flags, 4));
usleep(100000);
ASSERT_EQ(NULL, GetSocketFromServer(0));
StopServer();
}
TEST_F(RdmaTest, server_miss_before_hello_send) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_close_before_hello_send) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
close(acc_fd);
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EEOF, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_miss_during_magic_str) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
ASSERT_EQ(2, write(acc_fd, "RD", 2));
usleep(100000);
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_close_during_magic_str) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
ASSERT_EQ(2, write(acc_fd, "RD", 2));
close(acc_fd);
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EEOF, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_hello_invalid_magic_str) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
ASSERT_EQ(4, write(acc_fd, "ABCD", 4));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EPROTO, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_miss_during_hello_msg) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
ASSERT_EQ(4, write(acc_fd, "RDMA", 4));
ASSERT_EQ(2, write(acc_fd, "00", 2));
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_close_during_hello_msg) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
ASSERT_EQ(4, write(acc_fd, "RDMA", 4));
ASSERT_EQ(2, write(acc_fd, "00", 2));
close(acc_fd);
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EEOF, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_hello_invalid_msg_len) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
memcpy(data, "RDMA", 4);
uint16_t len = butil::HostToNet16(35);
memcpy(data + 4, &len, 2);
memset(data + 6, 0, 32);
ASSERT_EQ(38, write(acc_fd, data, 38));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EPROTO, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_hello_invalid_version) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
memcpy(data, "RDMA", 4);
uint16_t len = butil::HostToNet16(38);
memcpy(data + 4, &len, 2);
memset(data + 6, 0, 32);
ASSERT_EQ(38, write(acc_fd, data, 38));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
ASSERT_EQ(4, read(acc_fd, data, 4));
uint32_t* tmp = (uint32_t*)data;
ASSERT_EQ(0, butil::NetToHost32(*tmp));
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_hello_invalid_sq_rq_size) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
rdma::HelloMessage msg;
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 0;
msg.rq_size = 0;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
ASSERT_EQ(38, write(acc_fd, data, 38));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
ASSERT_EQ(4, read(acc_fd, data, 4));
uint32_t* tmp = (uint32_t*)data;
ASSERT_EQ(0, butil::NetToHost32(*tmp));
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_miss_after_ack) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
rdma::HelloMessage msg;
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
ASSERT_EQ(38, write(acc_fd, data, 38));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state);
ASSERT_EQ(4, read(acc_fd, data, 4));
uint32_t* tmp = (uint32_t*)data;
ASSERT_EQ(1, butil::NetToHost32(*tmp));
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_close_after_ack) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
rdma::HelloMessage msg;
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
ASSERT_EQ(38, write(acc_fd, data, 38));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state);
ASSERT_EQ(4, read(acc_fd, data, 4));
uint32_t* tmp = (uint32_t*)data;
ASSERT_EQ(1, butil::NetToHost32(*tmp));
close(acc_fd);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EEOF, cntl.ErrorCode());
}
TEST_F(RdmaTest, server_send_data_on_tcp_after_ack) {
butil::fd_guard sockfd(butil::tcp_listen(g_ep));
EXPECT_TRUE(sockfd >= 0);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state);
butil::fd_guard acc_fd(accept(sockfd, NULL, NULL));
ASSERT_TRUE(acc_fd >= 0);
uint8_t data[RDMA_HELLO_MSG_LEN];
ASSERT_EQ(38, read(acc_fd, data, 38));
rdma::HelloMessage msg;
msg.msg_len = 38;
msg.hello_ver = 1;
msg.impl_ver = 1;
msg.sq_size = 16;
msg.rq_size = 16;
msg.block_size = 8192;
msg.qp_num = 0;
msg.gid = rdma::GetRdmaGid();
memcpy(data, "RDMA", 4);
msg.Serialize(data + 4);
ASSERT_EQ(38, write(acc_fd, data, 38));
usleep(100000);
ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state);
ASSERT_EQ(38, write(acc_fd, data, 38));
bthread_id_join(cntl.call_id());
ASSERT_EQ(EPROTO, cntl.ErrorCode());
}
TEST_F(RdmaTest, try_global_disable_rdma) {
StartServer();
rdma::g_rdma_available.store(false, butil::memory_order_relaxed);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
req.set_sleep_us(200000);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
bthread_id_join(cntl.call_id());
ASSERT_EQ(0, cntl.ErrorCode());
StopServer();
rdma::g_rdma_available.store(true, butil::memory_order_relaxed);
}
TEST_F(RdmaTest, server_option_invalid) {
Server server;
ServerOptions options;
options.use_rdma = true;
// rtmp and rdma are incompatible
options.rtmp_service = (RtmpService*)1;
ASSERT_EQ(-1, server.Start(PORT, &options));
// nshead and rdma are incompatible
options.rtmp_service = NULL;
options.nshead_service = (NsheadService*)1;
ASSERT_EQ(-1, server.Start(PORT, &options));
// mongo and rdma are incompatible
options.nshead_service = NULL;
options.mongo_service_adaptor = (MongoServiceAdaptor*)1;
ASSERT_EQ(-1, server.Start(PORT, &options));
// ssl and rdma are incompatible
options.mongo_service_adaptor = NULL;
options.mutable_ssl_options()->default_cert.certificate = "test";
ASSERT_EQ(-1, server.Start(PORT, &options));
}
TEST_F(RdmaTest, channel_option_invalid) {
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
// rtmp and rdma are incompatible
chan_options.protocol = "rtmp";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
chan_options.protocol = "streaming_rpc";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// nshead and rdma are incompatible
chan_options.protocol = "nshead";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
chan_options.protocol = "nshead_mcpack";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// nova_pbrpc and rdma are incompatible
chan_options.protocol = "nova_pbrpc";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// public_pbrpc and rdma are incompatible
chan_options.protocol = "public_pbrpc";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// redis and rdma are incompatible
chan_options.protocol = "redis";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// memcache and rdma are incompatible
chan_options.protocol = "memcache";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// ubrpc and rdma are incompatible
chan_options.protocol = "ubrpc_compack";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// itp and rdma are incompatible
chan_options.protocol = "itp";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// esp and rdma are incompatible
chan_options.protocol = "esp";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// hulu_pbrpc and rdma are incompatible
chan_options.protocol = "hulu_pbrpc";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// sofa_pbrpc and rdma are incompatible
chan_options.protocol = "sofa_pbrpc";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// http and rdma are incompatible
chan_options.protocol = "http";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
// ssl and rdma are incompatible
chan_options.protocol = "baidu_std";
chan_options.mutable_ssl_options()->sni_name = "test";
ASSERT_EQ(-1, channel.Init(g_ep, &chan_options));
}
TEST_F(RdmaTest, rdma_client_to_rdma_server) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
bthread_id_join(cntl.call_id());
ASSERT_EQ(0, cntl.ErrorCode());
StopServer();
}
TEST_F(RdmaTest, tcp_client_to_tcp_server) {
StartServer(false);
Channel channel;
ChannelOptions chan_options;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
bthread_id_join(cntl.call_id());
ASSERT_EQ(0, cntl.ErrorCode());
StopServer();
}
TEST_F(RdmaTest, tcp_client_to_rdma_server) {
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
bthread_id_join(cntl.call_id());
ASSERT_EQ(0, cntl.ErrorCode());
StopServer();
}
TEST_F(RdmaTest, rdma_client_to_tcp_server) {
StartServer(false);
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000);
bthread_id_join(cntl.call_id());
ASSERT_EQ(EEOF, cntl.ErrorCode());
StopServer();
}
static const int RPC_NUM = 1024;
void DumpRdmaEndpointInfo(Socket* client, Socket* server) {
std::cout << std::endl << "client:";
client->_rdma_ep->DebugInfo(std::cout);
std::cout << std::endl << "server:";
server->_rdma_ep->DebugInfo(std::cout);
}
TEST_F(RdmaTest, send_rpcs_in_one_qp) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 3000;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
LOG(INFO) << "send 0 attachment";
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (cntl[i].ErrorCode() == ERPCTIMEDOUT) {
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s));
Socket* m = GetSocketFromServer(0);
DumpRdmaEndpointInfo(s.get(), m);
}
ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]";
}
LOG(INFO) << "send 4KB attachment";
butil::IOBuf attach;
attach.resize(4096);
for (int i = 0; i < RPC_NUM; ++i) {
cntl[i].Reset();
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (cntl[i].ErrorCode() == ERPCTIMEDOUT) {
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s));
Socket* m = GetSocketFromServer(0);
DumpRdmaEndpointInfo(s.get(), m);
}
ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]";
}
LOG(INFO) << "send 1MB attachment";
attach.resize(1048576);
for (int i = 0; i < RPC_NUM; ++i) {
cntl[i].Reset();
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (cntl[i].ErrorCode() == ERPCTIMEDOUT) {
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s));
Socket* m = GetSocketFromServer(0);
DumpRdmaEndpointInfo(s.get(), m);
}
ASSERT_TRUE(0 == cntl[i].ErrorCode() ||
EOVERCROWDED == cntl[i].ErrorCode()) << "req[" << i << "]";
}
StopServer();
}
TEST_F(RdmaTest, send_rpc_in_many_qp) {
if (!FLAGS_rdma_test_enable) {
return;
}
Server server[100];
MyEchoService svc[100];
int num = 100;
for (int i = 0; i < num; ++i) {
ServerOptions options;
options.use_rdma = true;
options.idle_timeout_sec = 1;
options.max_concurrency = 0;
options.internal_port = -1;
server[i].AddService(&svc[i], SERVER_DOESNT_OWN_SERVICE);
EXPECT_EQ(0, server[i].Start(i + 8000, &options));
}
int port = 0;
butil::IOBuf attach;
attach.resize(4096);
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
Channel channel[RPC_NUM];
Server* svr[RPC_NUM];
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::ip_t ip;
butil::str2ip(g_ip.c_str(), &ip);
for (int i = 0; i < RPC_NUM; ++i) {
svr[i] = &server[i % num];
butil::EndPoint ep(ip, 8000 + ((port++) % num));
ASSERT_EQ(0, channel[i].Init(ep, &chan_options));
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel[i]).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (cntl[i].ErrorCode() == ERPCTIMEDOUT) {
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s));
std::vector<SocketId> sids;
svr[i]->_am->ListConnections(&sids);
for (size_t i = 0; i < sids.size(); ++i) {
SocketUniquePtr m;
ASSERT_EQ(0, Socket::AddressFailedAsWell(sids[i], &m));
DumpRdmaEndpointInfo(s.get(), m.get());
}
}
ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]";
}
for (int i = 0; i < num; ++i) {
server[i].Stop(0);
server[i].Join();
}
}
TEST_F(RdmaTest, send_rpcs_as_pooled_connection) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 30000; // it may very slow
chan_options.timeout_ms = 30000;
chan_options.max_retry = 0;
chan_options.connection_type = "pooled";
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf attach;
attach.resize(4096);
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (cntl[i].ErrorCode() == ERPCTIMEDOUT) {
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s));
Socket* m = GetSocketFromServer(0);
DumpRdmaEndpointInfo(s.get(), m);
}
ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]";
}
StopServer();
}
TEST_F(RdmaTest, send_rpcs_as_short_connection) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 30000; // it may very slow
chan_options.timeout_ms = 30000;
chan_options.max_retry = 0;
chan_options.connection_type = "short";
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf attach;
attach.resize(4096);
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (cntl[i].ErrorCode() == ERPCTIMEDOUT) {
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s));
Socket* m = GetSocketFromServer(0);
DumpRdmaEndpointInfo(s.get(), m);
}
ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]";
}
StopServer();
}
TEST_F(RdmaTest, server_stop_during_rpc) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 3000;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf attach;
attach.resize(4096);
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
if (i == 0) StopServer();
int error_code = cntl[i].ErrorCode();
ASSERT_TRUE(error_code == 0 ||
error_code == EEOF ||
error_code == ELOGOFF ||
error_code == EHOSTDOWN) << "req[" << i << "]: " << error_code;
}
}
TEST_F(RdmaTest, server_close_during_rpc) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 3000;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf attach;
attach.resize(4096);
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(attach);
if (i == RPC_NUM / 2) {
req[i].set_close_fd(true);
}
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
int error_code = cntl[i].ErrorCode();
ASSERT_TRUE(error_code == 0 ||
error_code == EEOF ||
error_code == EFAILEDSOCKET ||
error_code == EHOSTDOWN) << "req[" << i << "]: " << error_code;
}
StopServer();
}
TEST_F(RdmaTest, client_close_during_rpc) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 3000;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf attach;
attach.resize(4096);
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
cntl[0].CloseConnection("Close connection");
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
int error_code = cntl[i].ErrorCode();
ASSERT_TRUE(error_code == 0 ||
error_code == ECLOSE ||
error_code == EHOSTDOWN) << "req[" << i << "]: " << error_code;
}
StopServer();
}
TEST_F(RdmaTest, verbs_error_handling) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
req.set_sleep_us(200000);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done);
usleep(100000); // wait for rdma handshake complete
SocketUniquePtr s;
ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s));
ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
ibv_sge sge;
void* buf = malloc(8192);
sge.addr = (uint64_t)buf;
sge.length = 8192;
sge.lkey = 1; // incorrect lkey
wr.sg_list = &sge;
wr.num_sge = 1;
ibv_send_wr* bad = NULL;
ibv_post_send(s->_rdma_ep->_resource->qp, &wr, &bad);
bthread_id_join(cntl.call_id());
ASSERT_EQ(ERDMA, cntl.ErrorCode());
free(buf);
StopServer();
}
TEST_F(RdmaTest, rdma_use_parallel_channel) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
const size_t NCHANS = 8;
Channel subchans[NCHANS];
ParallelChannel channel;
ChannelOptions opts;
opts.use_rdma = true;
for (size_t i = 0; i < NCHANS; ++i) {
ASSERT_EQ(0, subchans[i].Init(_naming_url.c_str(), "rR", &opts));
ASSERT_EQ(0, channel.AddChannel(
&subchans[i], DOESNT_OWN_CHANNEL,
NULL, NULL));
}
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, NULL);
ASSERT_EQ(0, cntl.ErrorCode());
ASSERT_EQ(NCHANS, (size_t)cntl.sub_count());
StopServer();
}
TEST_F(RdmaTest, rdma_use_selective_channel) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
const size_t NCHANS = 8;
SelectiveChannel channel;
ChannelOptions opts;
opts.use_rdma = true;
ASSERT_EQ(0, channel.Init("rr", &opts));
for (size_t i = 0; i < NCHANS; ++i) {
Channel* subchan = new Channel;
ASSERT_EQ(0, subchan->Init(_naming_url.c_str(), "rR", &opts));
ASSERT_EQ(0, channel.AddChannel(subchan, NULL));
}
Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, NULL);
ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
ASSERT_EQ(1, cntl.sub_count());
StopServer();
}
static void MockFree(void* buf) { }
TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 500;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf attach;
void* data = malloc(4096);;
attach.append_user_data(data, 4096, NULL);
req[0].set_message(__FUNCTION__);
cntl[0].request_attachment().append(attach);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[0], &req[0], &res[0], done);
bthread_id_join(cntl[0].call_id());
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
attach.clear();
sleep(2); // wait for client recover from EHOSTDOWN
cntl[0].Reset();
char* mr[2 * RPC_NUM];
uint32_t lkey[2 * RPC_NUM];
for (size_t i = 0; i < RPC_NUM; ++i) {
mr[2 * i] = (char*)malloc(4096);
memset(mr[2 * i], i % 100, 4096);
lkey[2 * i] = rdma::RegisterMemoryForRdma(mr[2 * i], 4096);
ASSERT_TRUE(lkey[2 * i] != 0);
cntl[i].request_attachment().append_user_data_with_meta(mr[2 * i] + i, 4096 - i, MockFree, lkey[2 * i]);
mr[2 * i + 1] = (char*)malloc(4096);
memset(mr[2 * i + 1], i % 100, 4096);
lkey[2 * i + 1] = rdma::RegisterMemoryForRdma(mr[2 * i + 1], 4096);
ASSERT_TRUE(lkey[2 * i + 1] != 0);
cntl[i].request_attachment().append_user_data_with_meta(mr[2 * i + 1] + i, 4096 - i, MockFree, lkey[2 * i + 1]);
req[i].set_message(__FUNCTION__);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (size_t i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]";
rdma::DeregisterMemoryForRdma(mr[i]);
ASSERT_EQ(2 * (4096 - i), cntl[i].response_attachment().size());
char tmp[8192];
cntl[i].response_attachment().copy_to(tmp, 2 * (4096 - i));
ASSERT_EQ(0, memcmp(mr[2 * i] + i, tmp, 4096 - i));
ASSERT_EQ(0, memcmp(mr[2 * i + 1] + i, tmp + 4096 - i, 4096 - i));
free(mr[2 * i]);
free(mr[2 * i + 1]);
}
StopServer();
}
TEST_F(RdmaTest, try_memory_pool_empty) {
if (!FLAGS_rdma_test_enable) {
return;
}
StartServer();
Channel channel;
ChannelOptions chan_options;
chan_options.use_rdma = true;
chan_options.connect_timeout_ms = 500;
chan_options.timeout_ms = 60000;
chan_options.max_retry = 0;
ASSERT_EQ(0, channel.Init(g_ep, &chan_options));
Controller cntl[RPC_NUM];
test::EchoRequest req[RPC_NUM];
test::EchoResponse res[RPC_NUM];
butil::IOBuf iobuf[RPC_NUM];
for (int i = 0; i < 1024; ++i) {
if (iobuf[i].resize(1048576 * 8)) {
// 8MB for each iobuf
break;
}
}
for (int i = 0; i < RPC_NUM; ++i) {
req[i].set_message(__FUNCTION__);
cntl[i].request_attachment().append(iobuf[i]);
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done);
}
for (int i = 0; i < RPC_NUM; ++i) {
bthread_id_join(cntl[i].call_id());
}
StopServer();
}
#endif // if BRPC_WITH_RDMA
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
#if BRPC_WITH_RDMA
rdma::FLAGS_rdma_trace_verbose = true;
rdma::FLAGS_rdma_memory_pool_max_regions = 2;
FLAGS_log_idle_connection_close = true;
if (!FLAGS_rdma_test_enable) {
// skip UT requiring rdma runtime environment
rdma::g_rdma_available.store(true, butil::memory_order_relaxed);
rdma::g_skip_rdma_init = true;
}
#endif // if BRPC_WITH_RDMA
return RUN_ALL_TESTS();
}