| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #ifndef RDMA_WRAP_H |
| #define RDMA_WRAP_H |
| |
| #include <rdma/rdma_cma.h> |
| |
| #include "qpid/RefCounted.h" |
| #include "qpid/sys/IOHandle.h" |
| #include "qpid/sys/Mutex.h" |
| |
| #include <boost/shared_ptr.hpp> |
| #include <boost/intrusive_ptr.hpp> |
| #include <boost/ptr_container/ptr_deque.hpp> |
| |
| #include <vector> |
| |
| namespace qpid { |
| namespace sys { |
| class SocketAddress; |
| }} |
| |
| namespace Rdma { |
| const int DEFAULT_TIMEOUT = 2000; // 2 secs |
| const int DEFAULT_BACKLOG = 100; |
| const int DEFAULT_CQ_ENTRIES = 256; |
| const int DEFAULT_WR_ENTRIES = 64; |
| extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM; |
| |
| int deviceCount(); |
| |
| struct Buffer { |
| friend class QueuePair; |
| friend class QueuePairEvent; |
| |
| char* bytes() const; |
| int32_t byteCount() const; |
| int32_t dataCount() const; |
| void dataCount(int32_t); |
| |
| private: |
| Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0); |
| int32_t bufferSize; |
| int32_t reserved; // for framing header |
| ::ibv_sge sge; |
| }; |
| |
| inline char* Buffer::bytes() const { |
| return (char*) sge.addr; |
| } |
| |
| /** return the number of bytes available for application data */ |
| inline int32_t Buffer::byteCount() const { |
| return bufferSize - reserved; |
| } |
| |
| inline int32_t Buffer::dataCount() const { |
| return sge.length; |
| } |
| |
| inline void Buffer::dataCount(int32_t s) { |
| // catch any attempt to overflow a buffer |
| assert(s <= bufferSize + reserved); |
| sge.length = s; |
| } |
| |
| class Connection; |
| |
| enum QueueDirection { |
| NONE, |
| SEND, |
| RECV |
| }; |
| |
| class QueuePairEvent { |
| boost::shared_ptr< ::ibv_cq > cq; |
| ::ibv_wc wc; |
| QueueDirection dir; |
| |
| friend class QueuePair; |
| |
| QueuePairEvent(); |
| QueuePairEvent( |
| const ::ibv_wc& w, |
| boost::shared_ptr< ::ibv_cq > c, |
| QueueDirection d); |
| |
| public: |
| operator bool() const; |
| bool immPresent() const; |
| uint32_t getImm() const; |
| QueueDirection getDirection() const; |
| ::ibv_wc_opcode getEventType() const; |
| ::ibv_wc_status getEventStatus() const; |
| Buffer* getBuffer() const; |
| }; |
| |
| // Wrapper for a queue pair - this has the functionality for |
| // putting buffers on the receive queue and for sending buffers |
| // to the other end of the connection. |
| class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { |
| friend class Connection; |
| |
| boost::shared_ptr< ::ibv_pd > pd; |
| boost::shared_ptr< ::ibv_mr > smr; |
| boost::shared_ptr< ::ibv_mr > rmr; |
| boost::shared_ptr< ::ibv_comp_channel > cchannel; |
| boost::shared_ptr< ::ibv_cq > scq; |
| boost::shared_ptr< ::ibv_cq > rcq; |
| boost::shared_ptr< ::ibv_qp > qp; |
| int outstandingSendEvents; |
| int outstandingRecvEvents; |
| std::vector<Buffer> sendBuffers; |
| std::vector<Buffer> recvBuffers; |
| qpid::sys::Mutex bufferLock; |
| std::vector<int> freeBuffers; |
| |
| QueuePair(boost::shared_ptr< ::rdma_cm_id > id); |
| ~QueuePair(); |
| |
| public: |
| typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; |
| |
| // Create a buffers to use for writing |
| void createSendBuffers(int sendBufferCount, int dataSize, int headerSize); |
| |
| // Get a send buffer |
| Buffer* getSendBuffer(); |
| |
| // Return buffer to pool after use |
| void returnSendBuffer(Buffer* b); |
| |
| // Create and post recv buffers |
| void allocateRecvBuffers(int recvBufferCount, int bufferSize); |
| |
| // Make channel non-blocking by making |
| // associated fd nonblocking |
| void nonblocking(); |
| |
| // If we get EAGAIN because the channel has been set non blocking |
| // and we'd have to wait then return an empty event |
| QueuePair::intrusive_ptr getNextChannelEvent(); |
| |
| QueuePairEvent getNextEvent(); |
| |
| void postRecv(Buffer* buf); |
| void postSend(Buffer* buf); |
| void postSend(uint32_t imm, Buffer* buf); |
| void notifyRecv(); |
| void notifySend(); |
| }; |
| |
| class ConnectionEvent { |
| friend class Connection; |
| |
| // The order of the members is important as we have to acknowledge |
| // the event before destroying the ids on destruction |
| boost::intrusive_ptr<Connection> id; |
| boost::intrusive_ptr<Connection> listen_id; |
| boost::shared_ptr< ::rdma_cm_event > event; |
| |
| ConnectionEvent() {} |
| ConnectionEvent(::rdma_cm_event* e); |
| |
| // Default copy, assignment and destructor ok |
| public: |
| operator bool() const; |
| ::rdma_cm_event_type getEventType() const; |
| ::rdma_conn_param getConnectionParam() const; |
| boost::intrusive_ptr<Connection> getConnection () const; |
| boost::intrusive_ptr<Connection> getListenId() const; |
| }; |
| |
| // For the moment this is a fairly simple wrapper for rdma_cm_id. |
| // |
| // NB: It allocates a protection domain (pd) per connection which means that |
| // registered buffers can't be shared between different connections |
| // (this can only happen between connections on the same controller in any case, |
| // so needs careful management if used) |
| class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { |
| boost::shared_ptr< ::rdma_event_channel > channel; |
| boost::shared_ptr< ::rdma_cm_id > id; |
| QueuePair::intrusive_ptr qp; |
| |
| void* context; |
| |
| friend class ConnectionEvent; |
| friend class QueuePair; |
| |
| // Wrap the passed in rdma_cm_id with a Connection |
| // this basically happens only on connection request |
| Connection(::rdma_cm_id* i); |
| Connection(); |
| ~Connection(); |
| |
| void ensureQueuePair(); |
| |
| public: |
| typedef boost::intrusive_ptr<Connection> intrusive_ptr; |
| |
| static intrusive_ptr make(); |
| static intrusive_ptr find(::rdma_cm_id* i); |
| |
| template <typename T> |
| void addContext(T* c) { |
| // Don't allow replacing context |
| if (!context) |
| context = c; |
| } |
| |
| void removeContext() { |
| context = 0; |
| } |
| |
| template <typename T> |
| T* getContext() { |
| return static_cast<T*>(context); |
| } |
| |
| // Make channel non-blocking by making |
| // associated fd nonblocking |
| void nonblocking(); |
| |
| // If we get EAGAIN because the channel has been set non blocking |
| // and we'd have to wait then return an empty event |
| ConnectionEvent getNextEvent(); |
| |
| void bind(const qpid::sys::SocketAddress& src_addr) const; |
| void listen(int backlog = DEFAULT_BACKLOG) const; |
| void resolve_addr( |
| const qpid::sys::SocketAddress& dst_addr, |
| int timeout_ms = DEFAULT_TIMEOUT) const; |
| void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const; |
| void disconnect() const; |
| |
| // TODO: Currently you can only connect with the default connection parameters |
| void connect(const void* data, size_t len); |
| void connect(); |
| template <typename T> |
| void connect(const T* data) { |
| connect(data, sizeof(T)); |
| } |
| |
| // TODO: Not sure how to default accept params - they come from the connection request |
| // event |
| void accept(const ::rdma_conn_param& param, const void* data, size_t len); |
| void accept(const ::rdma_conn_param& param); |
| template <typename T> |
| void accept(const ::rdma_conn_param& param, const T* data) { |
| accept(param, data, sizeof(T)); |
| } |
| |
| void reject(const void* data, size_t len) const; |
| void reject() const; |
| template <typename T> |
| void reject(const T* data) const { |
| reject(data, sizeof(T)); |
| } |
| |
| QueuePair::intrusive_ptr getQueuePair(); |
| std::string getLocalName() const; |
| std::string getPeerName() const; |
| std::string getFullName() const { return getLocalName()+"-"+getPeerName(); } |
| }; |
| } |
| |
| std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t); |
| |
| #endif // RDMA_WRAP_H |