blob: 5f84793a5b5142f0ce781ab2a164627976cf130c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef RDMA_WRAP_H
#define RDMA_WRAP_H
#include <rdma/rdma_cma.h>
#include "qpid/RefCounted.h"
#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
#include <vector>
namespace qpid {
namespace sys {
class SocketAddress;
}}
namespace Rdma {
const int DEFAULT_TIMEOUT = 2000; // 2 secs
const int DEFAULT_BACKLOG = 100;
const int DEFAULT_CQ_ENTRIES = 256;
const int DEFAULT_WR_ENTRIES = 64;
extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM;
int deviceCount();
struct Buffer {
friend class QueuePair;
friend class QueuePairEvent;
char* bytes() const;
int32_t byteCount() const;
int32_t dataCount() const;
void dataCount(int32_t);
private:
Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0);
int32_t bufferSize;
int32_t reserved; // for framing header
::ibv_sge sge;
};
inline char* Buffer::bytes() const {
return (char*) sge.addr;
}
/** return the number of bytes available for application data */
inline int32_t Buffer::byteCount() const {
return bufferSize - reserved;
}
inline int32_t Buffer::dataCount() const {
return sge.length;
}
inline void Buffer::dataCount(int32_t s) {
// catch any attempt to overflow a buffer
assert(s <= bufferSize + reserved);
sge.length = s;
}
class Connection;
enum QueueDirection {
NONE,
SEND,
RECV
};
class QueuePairEvent {
boost::shared_ptr< ::ibv_cq > cq;
::ibv_wc wc;
QueueDirection dir;
friend class QueuePair;
QueuePairEvent();
QueuePairEvent(
const ::ibv_wc& w,
boost::shared_ptr< ::ibv_cq > c,
QueueDirection d);
public:
operator bool() const;
bool immPresent() const;
uint32_t getImm() const;
QueueDirection getDirection() const;
::ibv_wc_opcode getEventType() const;
::ibv_wc_status getEventStatus() const;
Buffer* getBuffer() const;
};
// Wrapper for a queue pair - this has the functionality for
// putting buffers on the receive queue and for sending buffers
// to the other end of the connection.
class QueuePair : public qpid::RefCounted {
friend class Connection;
boost::scoped_ptr< qpid::sys::IOHandle > handle;
boost::shared_ptr< ::ibv_pd > pd;
boost::shared_ptr< ::ibv_mr > smr;
boost::shared_ptr< ::ibv_mr > rmr;
boost::shared_ptr< ::ibv_comp_channel > cchannel;
boost::shared_ptr< ::ibv_cq > scq;
boost::shared_ptr< ::ibv_cq > rcq;
boost::shared_ptr< ::ibv_qp > qp;
int outstandingSendEvents;
int outstandingRecvEvents;
std::vector<Buffer> sendBuffers;
std::vector<Buffer> recvBuffers;
qpid::sys::Mutex bufferLock;
std::vector<int> freeBuffers;
QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
~QueuePair();
public:
typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
operator qpid::sys::IOHandle&() const;
// Create a buffers to use for writing
void createSendBuffers(int sendBufferCount, int dataSize, int headerSize);
// Get a send buffer
Buffer* getSendBuffer();
// Return buffer to pool after use
void returnSendBuffer(Buffer* b);
// Create and post recv buffers
void allocateRecvBuffers(int recvBufferCount, int bufferSize);
// Make channel non-blocking by making
// associated fd nonblocking
void nonblocking();
// If we get EAGAIN because the channel has been set non blocking
// and we'd have to wait then return an empty event
QueuePair::intrusive_ptr getNextChannelEvent();
QueuePairEvent getNextEvent();
void postRecv(Buffer* buf);
void postSend(Buffer* buf);
void postSend(uint32_t imm, Buffer* buf);
void notifyRecv();
void notifySend();
};
class ConnectionEvent {
friend class Connection;
// The order of the members is important as we have to acknowledge
// the event before destroying the ids on destruction
boost::intrusive_ptr<Connection> id;
boost::intrusive_ptr<Connection> listen_id;
boost::shared_ptr< ::rdma_cm_event > event;
ConnectionEvent() {}
ConnectionEvent(::rdma_cm_event* e);
// Default copy, assignment and destructor ok
public:
operator bool() const;
::rdma_cm_event_type getEventType() const;
::rdma_conn_param getConnectionParam() const;
boost::intrusive_ptr<Connection> getConnection () const;
boost::intrusive_ptr<Connection> getListenId() const;
};
// For the moment this is a fairly simple wrapper for rdma_cm_id.
//
// NB: It allocates a protection domain (pd) per connection which means that
// registered buffers can't be shared between different connections
// (this can only happen between connections on the same controller in any case,
// so needs careful management if used)
class Connection : public qpid::RefCounted {
boost::scoped_ptr< qpid::sys::IOHandle > handle;
boost::shared_ptr< ::rdma_event_channel > channel;
boost::shared_ptr< ::rdma_cm_id > id;
QueuePair::intrusive_ptr qp;
void* context;
friend class ConnectionEvent;
friend class QueuePair;
// Wrap the passed in rdma_cm_id with a Connection
// this basically happens only on connection request
Connection(::rdma_cm_id* i);
Connection();
~Connection();
void ensureQueuePair();
public:
typedef boost::intrusive_ptr<Connection> intrusive_ptr;
operator qpid::sys::IOHandle&() const;
static intrusive_ptr make();
static intrusive_ptr find(::rdma_cm_id* i);
template <typename T>
void addContext(T* c) {
// Don't allow replacing context
if (!context)
context = c;
}
void removeContext() {
context = 0;
}
template <typename T>
T* getContext() {
return static_cast<T*>(context);
}
// Make channel non-blocking by making
// associated fd nonblocking
void nonblocking();
// If we get EAGAIN because the channel has been set non blocking
// and we'd have to wait then return an empty event
ConnectionEvent getNextEvent();
void bind(const qpid::sys::SocketAddress& src_addr) const;
void listen(int backlog = DEFAULT_BACKLOG) const;
void resolve_addr(
const qpid::sys::SocketAddress& dst_addr,
int timeout_ms = DEFAULT_TIMEOUT) const;
void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const;
void disconnect() const;
// TODO: Currently you can only connect with the default connection parameters
void connect(const void* data, size_t len);
void connect();
template <typename T>
void connect(const T* data) {
connect(data, sizeof(T));
}
// TODO: Not sure how to default accept params - they come from the connection request
// event
void accept(const ::rdma_conn_param& param, const void* data, size_t len);
void accept(const ::rdma_conn_param& param);
template <typename T>
void accept(const ::rdma_conn_param& param, const T* data) {
accept(param, data, sizeof(T));
}
void reject(const void* data, size_t len) const;
void reject() const;
template <typename T>
void reject(const T* data) const {
reject(data, sizeof(T));
}
QueuePair::intrusive_ptr getQueuePair();
std::string getLocalName() const;
std::string getPeerName() const;
std::string getFullName() const { return getLocalName()+"-"+getPeerName(); }
};
}
std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);
#endif // RDMA_WRAP_H