blob: 30378d4c5f97414810f9b4fa3ac6cf3d73484b02 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/sys/windows/AsynchIoResult.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/windows/check.h"
#include "qpid/sys/windows/mingw32_compat.h"
#include <boost/thread/once.hpp>
#include <queue>
#include <winsock2.h>
#include <mswsock.h>
#include <windows.h>
#include <boost/bind.hpp>
namespace {
typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock;
/*
* The function pointers for AcceptEx and ConnectEx need to be looked up
* at run time.
*/
const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
SOCKET h = toSocketHandle(s);
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
LPFN_ACCEPTEX fnAcceptEx;
WSAIoctl(h,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
sizeof(guidAcceptEx),
&fnAcceptEx,
sizeof(fnAcceptEx),
&dwBytes,
NULL,
NULL);
if (fnAcceptEx == 0)
throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
return fnAcceptEx;
}
}
namespace qpid {
namespace sys {
namespace windows {
/*
* Asynch Acceptor
*
*/
class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
friend class AsynchAcceptResult;
public:
AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
~AsynchAcceptor();
void start(Poller::shared_ptr poller);
private:
void restart(void);
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
socket(s),
fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
}
AsynchAcceptor::~AsynchAcceptor()
{
socket.close();
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
PollerHandle ph = PollerHandle(socket);
poller->monitorHandle(ph, Poller::INPUT);
restart ();
}
void AsynchAcceptor::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
socket);
BOOL status;
status = fnAcceptEx(toSocketHandle(socket),
toSocketHandle(*result->newSocket),
result->addressBuffer,
0,
AsynchAcceptResult::SOCKADDRMAXLEN,
AsynchAcceptResult::SOCKADDRMAXLEN,
&bytesReceived,
result->overlapped());
QPID_WINDOWS_CHECK_ASYNC_START(status);
}
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
const Socket& listener)
: callback(cb), acceptor(acceptor),
listener(toSocketHandle(listener)),
newSocket(listener.createSameTypeSocket()) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
::setsockopt (toSocketHandle(*newSocket),
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&listener,
sizeof (listener));
callback(*(newSocket.release()));
acceptor->restart ();
delete this;
}
void AsynchAcceptResult::failure(int /*status*/) {
//if (status != WSA_OPERATION_ABORTED)
// Can there be anything else? ;
delete this;
}
/*
* AsynchConnector does synchronous connects for now... to do asynch the
* IocpPoller will need some extension to register an event handle as a
* CONNECT-type "direction", the connect completion/result will need an
* event handle to associate with the connecting handle. But there's no
* time for that right now...
*/
class AsynchConnector : public qpid::sys::AsynchConnector {
private:
ConnectedCallback connCallback;
FailedCallback failCallback;
const Socket& socket;
const std::string hostname;
const std::string port;
public:
AsynchConnector(const Socket& socket,
const std::string& hostname,
const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
};
AsynchConnector::AsynchConnector(const Socket& sock,
const std::string& hname,
const std::string& p,
ConnectedCallback connCb,
FailedCallback failCb) :
connCallback(connCb), failCallback(failCb), socket(sock),
hostname(hname), port(p)
{
}
void AsynchConnector::start(Poller::shared_ptr)
{
try {
socket.connect(hostname, port);
socket.setNonblocking();
connCallback(socket);
} catch(std::exception& e) {
if (failCallback)
failCallback(socket, -1, std::string(e.what()));
socket.close();
}
}
} // namespace windows
AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
Callback callback)
{
return new windows::AsynchAcceptor(s, callback);
}
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
const std::string& hostname,
const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
return new windows::AsynchConnector(s,
hostname,
port,
connCb,
failCb);
}
/*
* Asynch reader/writer
*/
namespace windows {
class AsynchIO : public qpid::sys::AsynchIO {
public:
AsynchIO(const Socket& s,
ReadCallback rCb,
EofCallback eofCb,
DisconnectCallback disCb,
ClosedCallback cCb = 0,
BuffersEmptyCallback eCb = 0,
IdleCallback iCb = 0);
~AsynchIO();
// Methods inherited from qpid::sys::AsynchIO
/**
* Notify the object is should delete itself as soon as possible.
*/
virtual void queueForDeletion();
/// Take any actions needed to prepare for working with the poller.
virtual void start(Poller::shared_ptr poller);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
virtual void notifyPendingWrite();
virtual void queueWriteClose();
virtual bool writeQueueEmpty();
virtual void startReading();
virtual void stopReading();
virtual void requestCallback(RequestCallback);
/**
* getQueuedBuffer returns a buffer from the buffer queue, if one is
* available.
*
* @retval Pointer to BufferBase buffer; 0 if none is available.
*/
virtual BufferBase* getQueuedBuffer();
private:
ReadCallback readCallback;
EofCallback eofCallback;
DisconnectCallback disCallback;
ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
const Socket& socket;
Poller::shared_ptr poller;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
/* The MSVC-supplied deque is not thread-safe; keep locks to serialize
* access to the buffer queue and write queue.
*/
Mutex bufferQueueLock;
// Number of outstanding I/O operations.
volatile LONG opsInProgress;
// Is there a write in progress?
volatile bool writeInProgress;
// Deletion requested, but there are callbacks in progress.
volatile bool queuedDelete;
// Socket close requested, but there are operations in progress.
volatile bool queuedClose;
private:
// Dispatch events that have completed.
void notifyEof(void);
void notifyDisconnect(void);
void notifyClosed(void);
void notifyBuffersEmpty(void);
void notifyIdle(void);
/**
* Initiate a write of the specified buffer. There's no callback for
* write completion to the AsynchIO object.
*/
void startWrite(AsynchIO::BufferBase* buff);
void close(void);
/**
* readComplete is called when a read request is complete.
*
* @param result Results of the operation.
*/
void readComplete(AsynchReadResult *result);
/**
* writeComplete is called when a write request is complete.
*
* @param result Results of the operation.
*/
void writeComplete(AsynchWriteResult *result);
/**
* Queue of completions to run. This queue enforces the requirement
* from upper layers that only one thread at a time is allowed to act
* on any given connection. Once a thread is busy processing a completion
* on this object, other threads that dispatch completions queue the
* completions here for the in-progress thread to handle when done.
* Thus, any threads can dispatch a completion from the IocpPoller, but
* this class ensures that actual processing at the connection level is
* only on one thread at a time.
*/
std::queue<AsynchIoResult *> completionQueue;
volatile bool working;
Mutex completionLock;
/**
* Called when there's a completion to process.
*/
void completion(AsynchIoResult *result);
};
// This is used to encapsulate pure callbacks into a handle
class CallbackHandle : public IOHandle {
public:
CallbackHandle(AsynchIoResult::Completer completeCb,
AsynchIO::RequestCallback reqCb = 0) :
IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb))
{}
};
AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb,
EofCallback eofCb,
DisconnectCallback disCb,
ClosedCallback cCb,
BuffersEmptyCallback eCb,
IdleCallback iCb) :
readCallback(rCb),
eofCallback(eofCb),
disCallback(disCb),
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
socket(s),
opsInProgress(0),
writeInProgress(false),
queuedDelete(false),
queuedClose(false),
working(false) {
}
struct deleter
{
template <typename T>
void operator()(T *ptr){ delete ptr;}
};
AsynchIO::~AsynchIO() {
std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void AsynchIO::queueForDeletion() {
queuedDelete = true;
if (opsInProgress > 0) {
QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
// AsynchIOHandler calls this then deletes itself; don't do any more
// callbacks.
readCallback = 0;
eofCallback = 0;
disCallback = 0;
closedCallback = 0;
emptyCallback = 0;
idleCallback = 0;
}
else {
delete this;
}
}
void AsynchIO::start(Poller::shared_ptr poller0) {
PollerHandle ph = PollerHandle(socket);
poller = poller0;
poller->monitorHandle(ph, Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
}
void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
QLock l(bufferQueueLock);
bufferQueue.push_back(buff);
}
void AsynchIO::unread(AsynchIO::BufferBase* buff) {
assert(buff);
buff->squish();
QLock l(bufferQueueLock);
bufferQueue.push_front(buff);
}
void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
assert(buff);
QLock l(bufferQueueLock);
writeQueue.push_back(buff);
if (!writeInProgress)
notifyPendingWrite();
}
void AsynchIO::notifyPendingWrite() {
// This method is generally called from a processing thread; transfer
// work on this to an I/O thread. Much of the upper layer code assumes
// that all I/O-related things happen in an I/O thread.
if (poller == 0) // Not really going yet...
return;
InterlockedIncrement(&opsInProgress);
PollerHandle ph(CallbackHandle(boost::bind(&AsynchIO::completion, this, _1)));
poller->monitorHandle(ph, Poller::OUTPUT);
}
void AsynchIO::queueWriteClose() {
queuedClose = true;
if (!writeInProgress)
notifyPendingWrite();
}
bool AsynchIO::writeQueueEmpty() {
QLock l(bufferQueueLock);
return writeQueue.size() == 0;
}
/*
* Initiate a read operation. AsynchIO::readComplete() will be
* called when the read is complete and data is available.
*/
void AsynchIO::startReading() {
if (queuedDelete)
return;
// (Try to) get a buffer; look on the front since there may be an
// "unread" one there with data remaining from last time.
AsynchIO::BufferBase *buff = 0;
{
QLock l(bufferQueueLock);
if (!bufferQueue.empty()) {
buff = bufferQueue.front();
assert(buff);
bufferQueue.pop_front();
}
}
if (buff != 0) {
int readCount = buff->byteCount - buff->dataCount;
AsynchReadResult *result =
new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1),
buff,
readCount);
DWORD bytesReceived = 0, flags = 0;
InterlockedIncrement(&opsInProgress);
int status = WSARecv(toSocketHandle(socket),
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesReceived,
&flags,
result->overlapped(),
0);
if (status != 0) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
result->failure(error);
result = 0; // result is invalid here
return;
}
}
// On status 0 or WSA_IO_PENDING, completion will handle the rest.
}
else {
notifyBuffersEmpty();
}
return;
}
// stopReading was added to prevent a race condition with read-credit on Linux.
// It may or may not be required on windows.
//
// AsynchIOHandler::readbuff() calls stopReading() inside the same
// critical section that protects startReading() in
// AsynchIOHandler::giveReadCredit().
//
void AsynchIO::stopReading() {}
// Queue the specified callback for invocation from an I/O thread.
void AsynchIO::requestCallback(RequestCallback callback) {
// This method is generally called from a processing thread; transfer
// work on this to an I/O thread. Much of the upper layer code assumes
// that all I/O-related things happen in an I/O thread.
if (poller == 0) // Not really going yet...
return;
InterlockedIncrement(&opsInProgress);
PollerHandle ph(CallbackHandle(
boost::bind(&AsynchIO::completion, this, _1),
callback));
poller->monitorHandle(ph, Poller::INPUT);
}
/**
* Return a queued buffer if there are enough to spare.
*/
AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
QLock l(bufferQueueLock);
// Always keep at least one buffer (it might have data that was
// "unread" in it).
if (bufferQueue.size() <= 1)
return 0;
BufferBase* buff = bufferQueue.back();
assert(buff);
bufferQueue.pop_back();
return buff;
}
void AsynchIO::notifyEof(void) {
if (eofCallback)
eofCallback(*this);
}
void AsynchIO::notifyDisconnect(void) {
if (disCallback)
disCallback(*this);
}
void AsynchIO::notifyClosed(void) {
if (closedCallback)
closedCallback(*this, socket);
}
void AsynchIO::notifyBuffersEmpty(void) {
if (emptyCallback)
emptyCallback(*this);
}
void AsynchIO::notifyIdle(void) {
if (idleCallback)
idleCallback(*this);
}
/*
* Asynch reader/writer using overlapped I/O
*/
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
AsynchWriteResult *result =
new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
buff,
buff->dataCount);
DWORD bytesSent = 0;
int status = WSASend(toSocketHandle(socket),
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesSent,
0,
result->overlapped(),
0);
if (status != 0) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
result->failure(error); // Also decrements in-progress count
result = 0; // result is invalid here
return;
}
}
// On status 0 or WSA_IO_PENDING, completion will handle the rest.
return;
}
/*
* Close the socket and callback to say we've done it
*/
void AsynchIO::close(void) {
socket.close();
notifyClosed();
}
void AsynchIO::readComplete(AsynchReadResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
if (status == 0 && bytes > 0) {
bool restartRead = true; // May not if receiver doesn't want more
if (readCallback)
readCallback(*this, result->getBuff());
if (restartRead)
startReading();
}
else {
// No data read, so put the buffer back. It may be partially filled,
// so "unread" it back to the front of the queue.
unread(result->getBuff());
notifyEof();
if (status != 0)
{
notifyDisconnect();
}
}
}
/*
* NOTE - this completion is called for completed writes and also when
* a write is desired. The difference is in the buff - if a write is desired
* the buff is 0.
*/
void AsynchIO::writeComplete(AsynchWriteResult *result) {
int status = result->getStatus();
size_t bytes = result->getTransferred();
AsynchIO::BufferBase *buff = result->getBuff();
if (buff != 0) {
writeInProgress = false;
if (status == 0 && bytes > 0) {
if (bytes < result->getRequested()) // Still more to go; resubmit
startWrite(buff);
else
queueReadBuffer(buff); // All done; back to the pool
}
else {
// An error... if it's a connection close, ignore it - it will be
// noticed and handled on a read completion any moment now.
// What to do with real error??? Save the Buffer?
}
}
// If there are no writes outstanding, check for more writes to initiate
// (either queued or via idle). The opsInProgress count is handled in
// completion()
if (!writeInProgress) {
bool writing = false;
{
QLock l(bufferQueueLock);
if (writeQueue.size() > 0) {
buff = writeQueue.front();
assert(buff);
writeQueue.pop_front();
startWrite(buff);
writing = true;
}
}
if (!writing && !queuedClose) {
notifyIdle();
}
}
return;
}
void AsynchIO::completion(AsynchIoResult *result) {
{
ScopedLock<Mutex> l(completionLock);
if (working) {
completionQueue.push(result);
return;
}
// First thread in with something to do; note we're working then keep
// handling completions.
working = true;
while (result != 0) {
// New scope to unlock temporarily.
{
ScopedUnlock<Mutex> ul(completionLock);
AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
if (r != 0)
readComplete(r);
else {
AsynchWriteResult *w =
dynamic_cast<AsynchWriteResult*>(result);
if (w != 0)
writeComplete(w);
else {
AsynchCallbackRequest *req =
dynamic_cast<AsynchCallbackRequest*>(result);
req->reqCallback(*this);
}
}
delete result;
result = 0;
InterlockedDecrement(&opsInProgress);
}
// Lock is held again.
if (completionQueue.empty())
continue;
result = completionQueue.front();
completionQueue.pop();
}
working = false;
}
// Lock released; ok to close if ops are done and close requested.
// Layer above will call back to queueForDeletion() if it hasn't
// already been done. If it already has, go ahead and delete.
if (opsInProgress == 0) {
if (queuedClose)
// close() may cause a delete; don't trust 'this' on return
close();
else if (queuedDelete)
delete this;
}
}
} // namespace windows
AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
AsynchIO::ReadCallback rCb,
AsynchIO::EofCallback eofCb,
AsynchIO::DisconnectCallback disCb,
AsynchIO::ClosedCallback cCb,
AsynchIO::BuffersEmptyCallback eCb,
AsynchIO::IdleCallback iCb)
{
return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
}
}} // namespace qpid::sys