| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "qpid/sys/rdma/RdmaIO.h" |
| |
| #include "qpid/log/Statement.h" |
| |
| #include <string> |
| #include <boost/bind.hpp> |
| |
| using qpid::sys::SocketAddress; |
| using qpid::sys::DispatchHandle; |
| using qpid::sys::Poller; |
| using qpid::sys::ScopedLock; |
| using qpid::sys::Mutex; |
| |
| namespace Rdma { |
| // Set packing as these are 'on the wire' structures |
| # pragma pack(push, 1) |
| |
| // Header structure for each transmitted frame |
| struct FrameHeader { |
| const static uint32_t FlagsMask = 0xf0000000; |
| uint32_t data; // written in network order |
| |
| FrameHeader() {} |
| FrameHeader(uint32_t credit, uint32_t flags = 0) { |
| data = htonl((credit & ~FlagsMask) | (flags & FlagsMask)); |
| } |
| |
| uint32_t credit() const { |
| return ntohl(data) & ~FlagsMask; |
| } |
| |
| uint32_t flags() const { |
| return ntohl(data) & FlagsMask; |
| } |
| }; |
| |
| const size_t FrameHeaderSize = sizeof(FrameHeader); |
| |
| // Structure for Connection Parameters on the network |
| // |
| // The original version (now called 0) of these parameters had a couple of mistakes: |
| // * No way to version the protocol (need to introduce a new protocol for iWarp) |
| // * Used host order int32 (but only deployed on LE archs as far as we know) |
| // so effectively was LE on the wire which is the opposite of network order. |
| // |
| // Fortunately the values sent were sufficiently restricted that a 16 bit short could |
| // be carved out to indicate the protocol version as these bits were always sent as 0. |
| // |
| // So the current version of parameters uses the last 2 bytes to indicate the protocol |
| // version, if this is 0 then we interpret the rest of the struct without byte swapping |
| // to remain compatible with the previous protocol. |
| struct NConnectionParams { |
| uint32_t maxRecvBufferSize; |
| uint16_t initialXmitCredit; |
| uint16_t rdmaProtocolVersion; |
| |
| NConnectionParams(const ConnectionParams& c) : |
| maxRecvBufferSize(c.rdmaProtocolVersion ? htonl(c.maxRecvBufferSize) : c.maxRecvBufferSize), |
| initialXmitCredit(c.rdmaProtocolVersion ? htons(c.initialXmitCredit) : c.initialXmitCredit), |
| // 0 is the same with/without byteswapping! |
| rdmaProtocolVersion(htons(c.rdmaProtocolVersion)) |
| {} |
| |
| operator ConnectionParams() const { |
| return |
| ConnectionParams( |
| rdmaProtocolVersion ? ntohl(maxRecvBufferSize) : maxRecvBufferSize, |
| rdmaProtocolVersion ? ntohs(initialXmitCredit) : initialXmitCredit, |
| ntohs(rdmaProtocolVersion)); |
| } |
| }; |
| # pragma pack(pop) |
| |
| class IOException : public std::exception { |
| std::string s; |
| |
| public: |
| IOException(std::string s0): s(s0) {} |
| ~IOException() throw() {} |
| |
| const char* what() const throw() { |
| return s.c_str(); |
| } |
| }; |
| |
| AsynchIO::AsynchIO( |
| QueuePair::intrusive_ptr q, |
| int version, |
| int size, |
| int xCredit, |
| int rCount, |
| ReadCallback rc, |
| IdleCallback ic, |
| FullCallback fc, |
| ErrorCallback ec |
| ) : |
| protocolVersion(version), |
| bufferSize(size), |
| recvCredit(0), |
| xmitCredit(xCredit), |
| recvBufferCount(rCount), |
| xmitBufferCount(xCredit), |
| outstandingWrites(0), |
| draining(false), |
| state(IDLE), |
| qp(q), |
| dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this), 0, 0), |
| readCallback(rc), |
| idleCallback(ic), |
| fullCallback(fc), |
| errorCallback(ec), |
| pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) |
| { |
| if (protocolVersion > maxSupportedProtocolVersion) |
| throw IOException("Unsupported Rdma Protocol"); |
| qp->nonblocking(); |
| qp->notifyRecv(); |
| qp->notifySend(); |
| |
| // Prepost recv buffers before we go any further |
| qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize); |
| |
| // Create xmit buffers, reserve space for frame header. |
| qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize); |
| } |
| |
| AsynchIO::~AsynchIO() { |
| // Warn if we are deleting whilst there are still unreclaimed write buffers |
| if ( outstandingWrites>0 ) |
| QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); |
| |
| // Turn off callbacks if necessary (before doing the deletes) |
| if (state != STOPPED) { |
| QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); |
| dataHandle.stopWatch(); |
| } |
| // TODO: It might turn out to be more efficient in high connection loads to reuse the |
| // buffers rather than having to reregister them all the time (this would be straightforward if all |
| // connections haver the same buffer size and harder otherwise) |
| } |
| |
| void AsynchIO::start(Poller::shared_ptr poller) { |
| dataHandle.startWatch(poller); |
| } |
| |
| // State constraints |
| // On entry: None |
| // On exit: STOPPED |
| // Mark for deletion/Delete this object when we have no outstanding writes |
| void AsynchIO::stop(NotifyCallback nc) { |
| ScopedLock<Mutex> l(stateLock); |
| state = STOPPED; |
| notifyCallback = nc; |
| dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); |
| } |
| |
| namespace { |
| void requestedCall(AsynchIO* aio, AsynchIO::RequestCallback callback) { |
| assert(callback); |
| callback(*aio); |
| } |
| } |
| |
| void AsynchIO::requestCallback(RequestCallback callback) { |
| // TODO creating a function object every time isn't all that |
| // efficient - if this becomes heavily used do something better (what?) |
| assert(callback); |
| dataHandle.call(boost::bind(&requestedCall, this, callback)); |
| } |
| |
| // Mark writing closed (so we don't accept any more writes or make any idle callbacks) |
| void AsynchIO::drainWriteQueue(NotifyCallback nc) { |
| draining = true; |
| notifyCallback = nc; |
| } |
| |
| void AsynchIO::queueBuffer(Buffer* buff, int credit) { |
| switch (protocolVersion) { |
| case 0: |
| if (!buff) { |
| Buffer* ob = getSendBuffer(); |
| // Have to send something as adapters hate it when you try to transfer 0 bytes |
| *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit); |
| ob->dataCount(sizeof(uint32_t)); |
| qp->postSend(credit | IgnoreData, ob); |
| } else if (credit > 0) { |
| qp->postSend(credit, buff); |
| } else { |
| qp->postSend(buff); |
| } |
| break; |
| case 1: |
| if (!buff) |
| buff = getSendBuffer(); |
| // Add FrameHeader after frame data |
| FrameHeader header(credit); |
| assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space. |
| ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize); |
| buff->dataCount(buff->dataCount()+FrameHeaderSize); |
| qp->postSend(buff); |
| break; |
| } |
| } |
| |
| Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) { |
| Buffer* b = e.getBuffer(); |
| switch (protocolVersion) { |
| case 0: { |
| bool dataPresent = true; |
| // Get our xmitCredit if it was sent |
| if (e.immPresent() ) { |
| assert(xmitCredit>=0); |
| xmitCredit += (e.getImm() & ~FlagsMask); |
| dataPresent = ((e.getImm() & IgnoreData) == 0); |
| assert(xmitCredit>0); |
| } |
| if (!dataPresent) { |
| b->dataCount(0); |
| } |
| break; |
| } |
| case 1: |
| b->dataCount(b->dataCount()-FrameHeaderSize); |
| FrameHeader header; |
| ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize); |
| assert(xmitCredit>=0); |
| xmitCredit += header.credit(); |
| assert(xmitCredit>=0); |
| break; |
| } |
| |
| return b; |
| } |
| |
| void AsynchIO::queueWrite(Buffer* buff) { |
| // Make sure we don't overrun our available buffers |
| // either at our end or the known available at the peers end |
| if (writable()) { |
| // TODO: We might want to batch up sending credit |
| int creditSent = recvCredit & ~FlagsMask; |
| queueBuffer(buff, creditSent); |
| recvCredit -= creditSent; |
| ++outstandingWrites; |
| --xmitCredit; |
| assert(xmitCredit>=0); |
| } else { |
| if (fullCallback) { |
| fullCallback(*this, buff); |
| } else { |
| QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away"); |
| returnSendBuffer(buff); |
| } |
| } |
| } |
| |
| // State constraints |
| // On entry: None |
| // On exit: NOTIFY_PENDING || STOPPED |
| void AsynchIO::notifyPendingWrite() { |
| ScopedLock<Mutex> l(stateLock); |
| switch (state) { |
| case IDLE: |
| dataHandle.call(pendingWriteAction); |
| // Fall Thru |
| case NOTIFY: |
| state = NOTIFY_PENDING; |
| break; |
| case NOTIFY_PENDING: |
| case STOPPED: |
| break; |
| } |
| } |
| |
| // State constraints |
| // On entry: IDLE || STOPPED |
| // On exit: IDLE || STOPPED |
| void AsynchIO::dataEvent() { |
| { |
| ScopedLock<Mutex> l(stateLock); |
| |
| if (state == STOPPED) return; |
| |
| state = NOTIFY_PENDING; |
| } |
| processCompletions(); |
| |
| writeEvent(); |
| } |
| |
| // State constraints |
| // On entry: NOTIFY_PENDING || STOPPED |
| // On exit: IDLE || STOPPED |
| void AsynchIO::writeEvent() { |
| State newState; |
| do { |
| { |
| ScopedLock<Mutex> l(stateLock); |
| |
| switch (state) { |
| case STOPPED: |
| return; |
| default: |
| state = NOTIFY; |
| } |
| } |
| |
| doWriteCallback(); |
| |
| { |
| ScopedLock<Mutex> l(stateLock); |
| |
| newState = state; |
| switch (newState) { |
| case NOTIFY_PENDING: |
| case STOPPED: |
| break; |
| default: |
| state = IDLE; |
| } |
| } |
| } while (newState == NOTIFY_PENDING); |
| } |
| |
| void AsynchIO::processCompletions() { |
| QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); |
| |
| // Re-enable notification for queue: |
| // This needs to happen before we could do anything that could generate more work completion |
| // events (ie the callbacks etc. in the following). |
| // This can't make us reenter this code as the handle attached to the completion queue will still be |
| // disabled by the poller until we leave this code |
| qp->notifyRecv(); |
| qp->notifySend(); |
| |
| int recvEvents = 0; |
| int sendEvents = 0; |
| |
| // If no event do nothing |
| if (!q) |
| return; |
| |
| assert(q == qp); |
| |
| // Repeat until no more events |
| do { |
| QueuePairEvent e(qp->getNextEvent()); |
| if (!e) |
| break; |
| |
| ::ibv_wc_status status = e.getEventStatus(); |
| if (status != IBV_WC_SUCCESS) { |
| // Need special check for IBV_WC_WR_FLUSH_ERR here |
| // we will get this for every send/recv queue entry that was pending |
| // when disconnected, these aren't real errors and mostly need to be ignored |
| if (status == IBV_WC_WR_FLUSH_ERR) { |
| QueueDirection dir = e.getDirection(); |
| if (dir == SEND) { |
| Buffer* b = e.getBuffer(); |
| ++sendEvents; |
| returnSendBuffer(b); |
| --outstandingWrites; |
| } else { |
| ++recvEvents; |
| } |
| continue; |
| } |
| errorCallback(*this); |
| // TODO: Probably need to flush queues at this point |
| return; |
| } |
| |
| // Test if recv (or recv with imm) |
| //::ibv_wc_opcode eventType = e.getEventType(); |
| QueueDirection dir = e.getDirection(); |
| if (dir == RECV) { |
| ++recvEvents; |
| |
| Buffer* b = extractBuffer(e); |
| |
| // if there was no data sent then the message was only to update our credit |
| if ( b->dataCount() > 0 ) { |
| readCallback(*this, b); |
| } |
| |
| // At this point the buffer has been consumed so put it back on the recv queue |
| // TODO: Is this safe to do if the connection is disconnected already? |
| qp->postRecv(b); |
| |
| // Received another message |
| ++recvCredit; |
| |
| // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) |
| if (recvCredit > recvBufferCount/2) { |
| // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message |
| // but this is a little unlikely, as to get in this state we have to have received messages without sending any |
| // for a while so its likely we've received an credit update from the far side. |
| if (writable()) { |
| int creditSent = recvCredit & ~FlagsMask; |
| queueBuffer(0, creditSent); |
| recvCredit -= creditSent; |
| ++outstandingWrites; |
| --xmitCredit; |
| assert(xmitCredit>=0); |
| } else { |
| QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit"); |
| } |
| } |
| } else { |
| Buffer* b = e.getBuffer(); |
| ++sendEvents; |
| returnSendBuffer(b); |
| --outstandingWrites; |
| } |
| } while (true); |
| |
| // Not sure if this is expected or not |
| if (recvEvents == 0 && sendEvents == 0) { |
| QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions"); |
| } |
| } |
| |
| void AsynchIO::doWriteCallback() { |
| // TODO: maybe don't call idle unless we're low on write buffers |
| // Keep on calling the idle routine as long as we are writable and we got something to write last call |
| |
| // Do callback even if there are no available free buffers as the application itself might be |
| // holding onto buffers |
| while (writable()) { |
| int xc = xmitCredit; |
| idleCallback(*this); |
| // Check whether we actually wrote anything |
| if (xmitCredit == xc) { |
| QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit); |
| return; |
| } |
| } |
| |
| checkDrained(); |
| } |
| |
| void AsynchIO::checkDrained() { |
| // If we've got all the write confirmations and we're draining |
| // We might get deleted in the drained callback so return immediately |
| if (draining) { |
| if (outstandingWrites == 0) { |
| draining = false; |
| NotifyCallback nc; |
| nc.swap(notifyCallback); |
| nc(*this); |
| } |
| return; |
| } |
| } |
| |
| void AsynchIO::doStoppedCallback() { |
| // Ensure we can't get any more callbacks (except for the stopped callback) |
| dataHandle.stopWatch(); |
| |
| NotifyCallback nc; |
| nc.swap(notifyCallback); |
| nc(*this); |
| } |
| |
| ConnectionManager::ConnectionManager( |
| ErrorCallback errc, |
| DisconnectedCallback dc |
| ) : |
| state(IDLE), |
| ci(Connection::make()), |
| handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), |
| errorCallback(errc), |
| disconnectedCallback(dc) |
| { |
| QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager"); |
| ci->nonblocking(); |
| } |
| |
| ConnectionManager::~ConnectionManager() |
| { |
| QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager"); |
| } |
| |
| void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) { |
| startConnection(ci, addr); |
| handle.startWatch(poller); |
| } |
| |
| void ConnectionManager::doStoppedCallback() { |
| // Ensure we can't get any more callbacks (except for the stopped callback) |
| handle.stopWatch(); |
| |
| NotifyCallback nc; |
| nc.swap(notifyCallback); |
| nc(*this); |
| } |
| |
| void ConnectionManager::stop(NotifyCallback nc) { |
| state = STOPPED; |
| notifyCallback = nc; |
| handle.call(boost::bind(&ConnectionManager::doStoppedCallback, this)); |
| } |
| |
| void ConnectionManager::event(DispatchHandle&) { |
| if (state.get() == STOPPED) return; |
| connectionEvent(ci); |
| } |
| |
| Listener::Listener( |
| const ConnectionParams& cp, |
| EstablishedCallback ec, |
| ErrorCallback errc, |
| DisconnectedCallback dc, |
| ConnectionRequestCallback crc |
| ) : |
| ConnectionManager(errc, dc), |
| checkConnectionParams(cp), |
| connectionRequestCallback(crc), |
| establishedCallback(ec) |
| { |
| } |
| |
| void Listener::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) { |
| ci->bind(addr); |
| ci->listen(); |
| } |
| |
| namespace { |
| const int64_t PoisonContext = -1; |
| } |
| |
| void Listener::connectionEvent(Connection::intrusive_ptr ci) { |
| ConnectionEvent e(ci->getNextEvent()); |
| |
| // If (for whatever reason) there was no event do nothing |
| if (!e) |
| return; |
| |
| // Important documentation ommision the new rdma_cm_id |
| // you get from CONNECT_REQUEST has the same context info |
| // as its parent listening rdma_cm_id |
| ::rdma_cm_event_type eventType = e.getEventType(); |
| ::rdma_conn_param conn_param = e.getConnectionParam(); |
| Rdma::Connection::intrusive_ptr id = e.getConnection(); |
| |
| // Check for previous disconnection (it appears that you actually can get connection |
| // request events after a disconnect event in rare circumstances) |
| if (reinterpret_cast<int64_t>(id->getContext<void*>())==PoisonContext) |
| return; |
| |
| switch (eventType) { |
| case RDMA_CM_EVENT_CONNECT_REQUEST: { |
| // Make sure peer has sent params we can use |
| if (!conn_param.private_data || conn_param.private_data_len < sizeof(NConnectionParams)) { |
| QPID_LOG(warning, "Rdma: rejecting connection attempt: unusable connection parameters"); |
| id->reject(); |
| break; |
| } |
| |
| const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data); |
| ConnectionParams cp = *rcp; |
| |
| // Reject if requested msg size is bigger than we allow |
| if ( |
| cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize || |
| cp.initialXmitCredit > checkConnectionParams.initialXmitCredit |
| ) { |
| QPID_LOG(warning, "Rdma: rejecting connection attempt: connection parameters out of range: (" |
| << cp.maxRecvBufferSize << ">" << checkConnectionParams.maxRecvBufferSize << " || " |
| << cp.initialXmitCredit << ">" << checkConnectionParams.initialXmitCredit |
| << ")"); |
| id->reject(&checkConnectionParams); |
| break; |
| } |
| |
| bool accept = true; |
| if (connectionRequestCallback) |
| accept = connectionRequestCallback(id, cp); |
| |
| if (accept) { |
| // Accept connection |
| cp.initialXmitCredit = checkConnectionParams.initialXmitCredit; |
| id->accept(conn_param, rcp); |
| } else { |
| // Reject connection |
| QPID_LOG(warning, "Rdma: rejecting connection attempt: application policy"); |
| id->reject(); |
| } |
| break; |
| } |
| case RDMA_CM_EVENT_ESTABLISHED: |
| establishedCallback(id); |
| break; |
| case RDMA_CM_EVENT_DISCONNECTED: |
| disconnectedCallback(id); |
| // Poison the id context so that we do no more callbacks on it |
| id->removeContext(); |
| id->addContext(reinterpret_cast<void*>(PoisonContext)); |
| break; |
| case RDMA_CM_EVENT_CONNECT_ERROR: |
| errorCallback(id, CONNECT_ERROR); |
| break; |
| default: |
| // Unexpected response |
| errorCallback(id, UNKNOWN); |
| //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; |
| } |
| } |
| |
| Connector::Connector( |
| const ConnectionParams& cp, |
| ConnectedCallback cc, |
| ErrorCallback errc, |
| DisconnectedCallback dc, |
| RejectedCallback rc |
| ) : |
| ConnectionManager(errc, dc), |
| connectionParams(cp), |
| rejectedCallback(rc), |
| connectedCallback(cc) |
| { |
| } |
| |
| void Connector::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) { |
| ci->resolve_addr(addr); |
| } |
| |
| void Connector::connectionEvent(Connection::intrusive_ptr ci) { |
| ConnectionEvent e(ci->getNextEvent()); |
| |
| // If (for whatever reason) there was no event do nothing |
| if (!e) |
| return; |
| |
| ::rdma_cm_event_type eventType = e.getEventType(); |
| ::rdma_conn_param conn_param = e.getConnectionParam(); |
| Rdma::Connection::intrusive_ptr id = e.getConnection(); |
| switch (eventType) { |
| case RDMA_CM_EVENT_ADDR_RESOLVED: |
| // RESOLVE_ADDR |
| ci->resolve_route(); |
| break; |
| case RDMA_CM_EVENT_ADDR_ERROR: |
| // RESOLVE_ADDR |
| errorCallback(ci, ADDR_ERROR); |
| break; |
| case RDMA_CM_EVENT_ROUTE_RESOLVED: { |
| // RESOLVE_ROUTE: |
| NConnectionParams rcp(connectionParams); |
| ci->connect(&rcp); |
| break; |
| } |
| case RDMA_CM_EVENT_ROUTE_ERROR: |
| // RESOLVE_ROUTE: |
| errorCallback(ci, ROUTE_ERROR); |
| break; |
| case RDMA_CM_EVENT_CONNECT_ERROR: |
| // CONNECTING |
| errorCallback(ci, CONNECT_ERROR); |
| break; |
| case RDMA_CM_EVENT_UNREACHABLE: |
| // CONNECTING |
| errorCallback(ci, UNREACHABLE); |
| break; |
| case RDMA_CM_EVENT_REJECTED: { |
| // CONNECTING |
| |
| // We can get this event if our peer is not running on the other side |
| // in this case we could get nearly anything in the private data: |
| // From private_data == 0 && private_data_len == 0 (Chelsio iWarp) |
| // to 148 bytes of zeros (Mellanox IB) |
| // |
| // So assume that if the the private data is absent or not the size of |
| // the connection parameters it isn't valid |
| ConnectionParams cp(0, 0, 0); |
| if (conn_param.private_data && conn_param.private_data_len == sizeof(NConnectionParams)) { |
| // Extract private data from event |
| const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data); |
| cp = *rcp; |
| } |
| rejectedCallback(ci, cp); |
| break; |
| } |
| case RDMA_CM_EVENT_ESTABLISHED: { |
| // CONNECTING |
| // Extract private data from event |
| assert(conn_param.private_data && conn_param.private_data_len >= sizeof(NConnectionParams)); |
| const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data); |
| ConnectionParams cp = *rcp; |
| connectedCallback(ci, cp); |
| break; |
| } |
| case RDMA_CM_EVENT_DISCONNECTED: |
| // ESTABLISHED |
| disconnectedCallback(ci); |
| break; |
| default: |
| QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType); |
| } |
| } |
| } |