blob: b4a748f9e27bf00f9cb80f5482969dd2796596e9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstring>
#include <iostream>
#include <string>
#include <vector>
#include <infiniband/verbs.h>
#include "butil/atomicops.h"
#include "butil/iobuf.h"
#include "butil/macros.h"
#include "brpc/socket.h"
namespace brpc {
class Socket;
namespace rdma {
class RdmaConnect : public AppConnect {
void StartConnect(const Socket* socket,
void (*done)(int err, void* data), void* data) override;
void StopConnect(Socket*) override;
struct RunGuard {
RunGuard(RdmaConnect* rc) { this_rc = rc; }
~RunGuard() { if (this_rc) this_rc->Run(); }
RdmaConnect* this_rc;
void Run();
void (*_done)(int, void*);
void* _data;
struct RdmaResource {
ibv_qp* qp;
ibv_cq* cq;
ibv_comp_channel* comp_channel;
RdmaResource* next;
class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser {
friend class RdmaConnect;
friend class brpc::Socket;
RdmaEndpoint(Socket* s);
// Global initialization
// Return 0 if success, -1 if failed and errno set
static int GlobalInitialize();
static void GlobalRelease();
// Reset the endpoint (for next use)
void Reset();
// Cut data from the given IOBuf list and use RDMA to send
// Return bytes cut if success, -1 if failed and errno set
ssize_t CutFromIOBufList(butil::IOBuf** data, size_t ndata);
// Whether the endpoint can send more data
bool IsWritable() const;
// For debug
void DebugInfo(std::ostream& os) const;
// Callback when there is new epollin event on TCP fd
static void OnNewDataFromTcp(Socket* m);
enum State {
UNINIT = 0x0,
C_ACK_SEND = 0x5,
S_HELLO_WAIT = 0x11,
S_ALLOC_QPCQ = 0x12,
S_BRINGUP_QP = 0x13,
S_HELLO_SEND = 0x14,
S_ACK_WAIT = 0x15,
FAILED = 0x300
// Process handshake at the client
static void* ProcessHandshakeAtClient(void* arg);
// Process handshake at the server
static void* ProcessHandshakeAtServer(void* arg);
// Allocate resources
// Return 0 if success, -1 if failed and errno set
int AllocateResources();
// Release resources
void DeallocateResources();
// Send Imm data to the remote side
// Arguments:
// imm: imm data in the WR
// Return:
// 0: success
// -1: failed, errno set
int SendImm(uint32_t imm);
// Try to send pure ACK to the remote side
// Arguments:
// num: the number of rq entry received
// Return:
// 0: success
// -1: failed, errno set
int SendAck(int num);
// Handle CQE
// If wc is not RDMA RECV event:
// return 0 if success, -1 if failed and errno set
// If wc is RDMA RECV event:
// return bytes appended if success, -1 if failed and errno set
ssize_t HandleCompletion(ibv_wc& wc);
// Post a given number of WRs to Recv Queue
// If zerocopy is true, reallocate block.
// Return 0 if success, -1 if failed and errno set
int PostRecv(uint32_t num, bool zerocopy);
// Post a WR pointing to the block to the local Recv Queue
// Arguments:
// block: the addr to receive data (ibv_sge.addr)
// block_size: the maximum length can be received (ibv_sge.length)
// Return:
// 0: success
// -1: failed, errno set
int DoPostRecv(void* block, size_t block_size);
// Read at most len bytes from fd in _socket to data
// wait for _read_butex if encounter EAGAIN
// return -1 if encounter other errno (including EOF)
int ReadFromFd(void* data, size_t len);
// Write at most len bytes from data to fd in _socket
// wait for _epollout_butex if encounter EAGAIN
// return -1 if encounter other errno
int WriteToFd(void* data, size_t len);
// Bringup the QP from RESET state to RTS state
// Arguments:
// lid: remote LID
// gid: remote GID
// qp_num: remote QP number
// Return:
// 0: success
// -1: failed, errno set
int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num);
// Get event from comp channel and ack the events
int GetAndAckEvents();
// Poll CQ and get the work completion
static void PollCq(Socket* m);
// Get the description of current handshake state
std::string GetStateStr() const;
// Try to read data on TCP fd in _socket
inline void TryReadOnTcp();
// Not owner
Socket* _socket;
// State of Handshake
State _state;
// rdma resource
RdmaResource* _resource;
// the number of events requiring ack
int _cq_events;
// the SocketId which wrap the comp channel of CQ
SocketId _cq_sid;
// Capacity of local Send Queue and local Recv Queue
uint16_t _sq_size;
uint16_t _rq_size;
// Act as sendbuf and recvbuf, but requires no memcpy
std::vector<butil::IOBuf> _sbuf;
std::vector<butil::IOBuf> _rbuf;
// Data address of _rbuf
std::vector<void*> _rbuf_data;
// Remote block size for receiving
uint32_t _remote_recv_block_size;
// The number of new recv WRs acked to the remote side
uint16_t _accumulated_ack;
// The number of WRs sent without solicited flag
uint16_t _unsolicited;
// The bytes sent without solicited flag
uint32_t _unsolicited_bytes;
// The current index should be used for sending
uint16_t _sq_current;
// The number of send WRs not signaled
uint16_t _sq_unsignaled;
// The just completed send WR's index
uint16_t _sq_sent;
// The just completed recv WR's index
uint16_t _rq_received;
// The capacity of local window: min(local SQ, remote RQ)
uint16_t _local_window_capacity;
// The capacity of remote window: min(local RQ, remote SQ)
uint16_t _remote_window_capacity;
// The number of WRs we can post to the local Send Queue
butil::atomic<uint16_t> _window_size;
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
} // namespace rdma
} // namespace brpc
#else // if BRPC_WITH_RDMA
class RdmaEndpoint { };
#endif // ifdef USE_RD<A