blob: 77e766dd79cf5244f2d4c329f4bca58dcd8aa590 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "RdmaIO.h"
#include "qpid/log/Statement.h"
#include <iostream>
#include <boost/bind.hpp>
using qpid::sys::DispatchHandle;
using qpid::sys::Poller;
namespace Rdma {
AsynchIO::AsynchIO(
QueuePair::intrusive_ptr q,
int size,
int xCredit,
int rCount,
ReadCallback rc,
IdleCallback ic,
FullCallback fc,
ErrorCallback ec
) :
qp(q),
dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
bufferSize(size),
recvCredit(0),
xmitCredit(xCredit),
recvBufferCount(rCount),
xmitBufferCount(xCredit),
outstandingWrites(0),
closed(false),
deleting(false),
state(IDLE),
readCallback(rc),
idleCallback(ic),
fullCallback(fc),
errorCallback(ec)
{
qp->nonblocking();
qp->notifyRecv();
qp->notifySend();
// Prepost some recv buffers before we go any further
for (int i = 0; i<recvBufferCount; ++i) {
// Allocate recv buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = b->byteCount;
qp->postRecv(b);
}
for (int i = 0; i<xmitBufferCount; ++i) {
// Allocate xmit buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
bufferQueue.push_front(b);
b->dataCount = 0;
b->dataStart = 0;
}
}
AsynchIO::~AsynchIO() {
// Warn if we are deleting whilst there are still unreclaimed write buffers
if ( outstandingWrites>0 )
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
// Turn off callbacks (before doing the deletes)
dataHandle.stopWatch();
// The buffers ptr_deque automatically deletes all the buffers we've allocated
// TODO: It might turn out to be more efficient in high connection loads to reuse the
// buffers rather than having to reregister them all the time (this would be straightforward if all
// connections haver the same buffer size and harder otherwise)
}
void AsynchIO::start(Poller::shared_ptr poller) {
dataHandle.startWatch(poller);
}
// Mark for deletion/Delete this object when we have no outstanding writes
void AsynchIO::deferDelete() {
State oldState;
State newState;
bool doReturn;
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
// It is safe to assign to deleting here as we either delete ourselves
// before leaving this function or deleting is set on exit
do {
newState = oldState = state.get();
doReturn = false;
if (outstandingWrites > 0 || oldState != IDLE) {
deleting = true;
doReturn = true;
} else{
newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
}
} while (!state.boolCompareAndSwap(oldState, newState));
if (doReturn) {
return;
}
delete this;
}
void AsynchIO::queueWrite(Buffer* buff) {
// Make sure we don't overrun our available buffers
// either at our end or the known available at the peers end
if (writable()) {
// TODO: We might want to batch up sending credit
if (recvCredit > 0) {
int creditSent = recvCredit & ~FlagsMask;
qp->postSend(creditSent, buff);
recvCredit -= creditSent;
} else {
qp->postSend(buff);
}
++outstandingWrites;
--xmitCredit;
} else {
if (fullCallback) {
fullCallback(*this, buff);
} else {
QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away");
returnBuffer(buff);
}
}
}
// Mark now closed (so we don't accept any more writes or make any idle callbacks)
void AsynchIO::queueWriteClose() {
// Don't think we actually need to lock here as transition is 1 way only to closed
closed = true;
}
void AsynchIO::notifyPendingWrite() {
// As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
// If we are then we just return as we know that we will eventually do the idle callback anyway.
//
// qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
// We can get here in any state (as the caller could be in any thread)
State oldState;
State newState;
bool doReturn;
do {
newState = oldState = state.get();
doReturn = false;
switch (oldState) {
case NOTIFY_WRITE:
case PENDING_NOTIFY:
// We only need to note a pending notify if we're already doing a notify as data processing
// is always followed by write notification processing
newState = PENDING_NOTIFY;
doReturn = true;
break;
case PENDING_DATA:
doReturn = true;
break;
case DATA:
// Only need to return here as data processing will do the idleCallback itself anyway
doReturn = true;
break;
case IDLE:
newState = NOTIFY_WRITE;
break;
case DELETED:
assert(oldState!=DELETED);
doReturn = true;
};
} while (!state.boolCompareAndSwap(oldState, newState));
if (doReturn) {
return;
}
doWriteCallback();
// Keep track of what we need to do so that we can release the lock
enum {COMPLETION, NOTIFY, RETURN, EXIT} action;
// If there was pending data whilst we were doing this, process it now
//
// Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the
// correct result if we reenter notifyPendingWrite(), in which case we want to
// end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
// not IDLE)
do {
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
do {
newState = oldState = state.get();
action = RETURN; // Anything but COMPLETION
switch (oldState) {
case NOTIFY_WRITE:
newState = IDLE;
action = (action == COMPLETION) ? EXIT : RETURN;
break;
case PENDING_DATA:
newState = NOTIFY_WRITE;
action = COMPLETION;
break;
case PENDING_NOTIFY:
newState = NOTIFY_WRITE;
action = NOTIFY;
break;
default:
assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
action = RETURN;
}
} while (!state.boolCompareAndSwap(oldState, newState));
// Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
// so that we do need to process completions or notifications now
switch (action) {
case COMPLETION:
processCompletions();
// Fall through
case NOTIFY:
doWriteCallback();
break;
case RETURN:
return;
case EXIT:
// If we just processed completions we might need to delete ourselves
if (deleting && outstandingWrites == 0) {
delete this;
}
return;
}
} while (true);
}
void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) {
// Keep track of writable notifications
// qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
State oldState;
State newState;
bool doReturn;
do {
newState = oldState = state.get();
doReturn = false;
// We're already processing a notification
switch (oldState) {
case IDLE:
newState = DATA;
break;
default:
// Can't get here in DATA state as that would violate the serialisation rules
assert( oldState!=DATA );
newState = PENDING_DATA;
doReturn = true;
}
} while (!state.boolCompareAndSwap(oldState, newState));
if (doReturn) {
return;
}
processCompletions();
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
do {
newState = oldState = state.get();
assert( oldState==DATA );
newState = NOTIFY_WRITE;
} while (!state.boolCompareAndSwap(oldState, newState));
do {
doWriteCallback();
// qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
bool doBreak;
do {
newState = oldState = state.get();
doBreak = false;
if ( oldState==NOTIFY_WRITE ) {
newState = IDLE;
doBreak = true;
} else {
// Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
assert( oldState==PENDING_NOTIFY );
newState = NOTIFY_WRITE;
}
} while (!state.boolCompareAndSwap(oldState, newState));
if (doBreak) {
break;
}
} while (true);
// We might need to delete ourselves
if (deleting && outstandingWrites == 0) {
delete this;
}
}
void AsynchIO::processCompletions() {
QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
// Re-enable notification for queue:
// This needs to happen before we could do anything that could generate more work completion
// events (ie the callbacks etc. in the following).
// This can't make us reenter this code as the handle attached to the completion queue will still be
// disabled by the poller until we leave this code
qp->notifyRecv();
qp->notifySend();
int recvEvents = 0;
int sendEvents = 0;
// If no event do nothing
if (!q)
return;
assert(q == qp);
// Repeat until no more events
do {
QueuePairEvent e(qp->getNextEvent());
if (!e)
break;
::ibv_wc_status status = e.getEventStatus();
if (status != IBV_WC_SUCCESS) {
errorCallback(*this);
// TODO: Probably need to flush queues at this point
return;
}
// Test if recv (or recv with imm)
//::ibv_wc_opcode eventType = e.getEventType();
Buffer* b = e.getBuffer();
QueueDirection dir = e.getDirection();
if (dir == RECV) {
++recvEvents;
// Get our xmitCredit if it was sent
bool dataPresent = true;
if (e.immPresent() ) {
xmitCredit += (e.getImm() & ~FlagsMask);
dataPresent = ((e.getImm() & IgnoreData) == 0);
}
// if there was no data sent then the message was only to update our credit
if ( dataPresent ) {
readCallback(*this, b);
}
// At this point the buffer has been consumed so put it back on the recv queue
b->dataStart = 0;
b->dataCount = 0;
qp->postRecv(b);
// Received another message
++recvCredit;
// Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently)
if (recvCredit > recvBufferCount/2) {
// TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message
// but this is a little unlikely, as to get in this state we have to have received messages without sending any
// for a while so its likely we've received an credit update from the far side.
if (writable()) {
Buffer* ob = getBuffer();
// Have to send something as adapters hate it when you try to transfer 0 bytes
*reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit);
ob->dataCount = sizeof(uint32_t);
int creditSent = recvCredit & ~FlagsMask;
qp->postSend(creditSent | IgnoreData, ob);
recvCredit -= creditSent;
++outstandingWrites;
--xmitCredit;
} else {
QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit");
}
}
} else {
++sendEvents;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
}
--outstandingWrites;
}
} while (true);
// Not sure if this is expected or not
if (recvEvents == 0 && sendEvents == 0) {
QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions");
}
}
void AsynchIO::doWriteCallback() {
// TODO: maybe don't call idle unless we're low on write buffers
// Keep on calling the idle routine as long as we are writable and we got something to write last call
while (writable()) {
int xc = xmitCredit;
idleCallback(*this);
// Check whether we actually wrote anything
if (xmitCredit == xc) {
QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit);
return;
}
}
}
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
assert(!bufferQueue.empty());
Buffer* b = bufferQueue.front();
bufferQueue.pop_front();
b->dataCount = 0;
b->dataStart = 0;
return b;
}
void AsynchIO::returnBuffer(Buffer* b) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
b->dataCount = 0;
b->dataStart = 0;
}
ConnectionManager::ConnectionManager(
ErrorCallback errc,
DisconnectedCallback dc
) :
ci(Connection::make()),
handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0),
errorCallback(errc),
disconnectedCallback(dc)
{
ci->nonblocking();
}
void ConnectionManager::start(Poller::shared_ptr poller) {
startConnection(ci);
handle.startWatch(poller);
}
void ConnectionManager::event(DispatchHandle&) {
connectionEvent(ci);
}
Listener::Listener(
const sockaddr& src,
const ConnectionParams& cp,
EstablishedCallback ec,
ErrorCallback errc,
DisconnectedCallback dc,
ConnectionRequestCallback crc
) :
ConnectionManager(errc, dc),
src_addr(src),
checkConnectionParams(cp),
connectionRequestCallback(crc),
establishedCallback(ec)
{
}
void Listener::startConnection(Connection::intrusive_ptr ci) {
ci->bind(src_addr);
ci->listen();
}
void Listener::connectionEvent(Connection::intrusive_ptr ci) {
ConnectionEvent e(ci->getNextEvent());
// If (for whatever reason) there was no event do nothing
if (!e)
return;
// Important documentation ommision the new rdma_cm_id
// you get from CONNECT_REQUEST has the same context info
// as its parent listening rdma_cm_id
::rdma_cm_event_type eventType = e.getEventType();
::rdma_conn_param conn_param = e.getConnectionParam();
Rdma::Connection::intrusive_ptr id = e.getConnection();
switch (eventType) {
case RDMA_CM_EVENT_CONNECT_REQUEST: {
// Make sure peer has sent params we can use
if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) {
id->reject();
break;
}
ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
// Reject if requested msg size is bigger than we allow
if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) {
id->reject(&checkConnectionParams);
break;
}
bool accept = true;
if (connectionRequestCallback)
accept = connectionRequestCallback(id, cp);
if (accept) {
// Accept connection
cp.initialXmitCredit = checkConnectionParams.initialXmitCredit;
id->accept(conn_param, &cp);
} else {
// Reject connection
id->reject();
}
break;
}
case RDMA_CM_EVENT_ESTABLISHED:
establishedCallback(id);
break;
case RDMA_CM_EVENT_DISCONNECTED:
disconnectedCallback(id);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
errorCallback(id, CONNECT_ERROR);
break;
default:
// Unexpected response
errorCallback(id, UNKNOWN);
//std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
}
}
Connector::Connector(
const sockaddr& dst,
const ConnectionParams& cp,
ConnectedCallback cc,
ErrorCallback errc,
DisconnectedCallback dc,
RejectedCallback rc
) :
ConnectionManager(errc, dc),
dst_addr(dst),
connectionParams(cp),
rejectedCallback(rc),
connectedCallback(cc)
{
}
void Connector::startConnection(Connection::intrusive_ptr ci) {
ci->resolve_addr(dst_addr);
}
void Connector::connectionEvent(Connection::intrusive_ptr ci) {
ConnectionEvent e(ci->getNextEvent());
// If (for whatever reason) there was no event do nothing
if (!e)
return;
::rdma_cm_event_type eventType = e.getEventType();
::rdma_conn_param conn_param = e.getConnectionParam();
Rdma::Connection::intrusive_ptr id = e.getConnection();
switch (eventType) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
// RESOLVE_ADDR
ci->resolve_route();
break;
case RDMA_CM_EVENT_ADDR_ERROR:
// RESOLVE_ADDR
errorCallback(ci, ADDR_ERROR);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
// RESOLVE_ROUTE:
ci->connect(&connectionParams);
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
// RESOLVE_ROUTE:
errorCallback(ci, ROUTE_ERROR);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
// CONNECTING
errorCallback(ci, CONNECT_ERROR);
break;
case RDMA_CM_EVENT_UNREACHABLE:
// CONNECTING
errorCallback(ci, UNREACHABLE);
break;
case RDMA_CM_EVENT_REJECTED: {
// CONNECTING
// Extract private data from event
assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams));
ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
rejectedCallback(ci, cp);
break;
}
case RDMA_CM_EVENT_ESTABLISHED: {
// CONNECTING
// Extract private data from event
assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams));
ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
connectedCallback(ci, cp);
break;
}
case RDMA_CM_EVENT_DISCONNECTED:
// ESTABLISHED
disconnectedCallback(ci);
break;
default:
QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType);
}
}
}