blob: 44298ac8ea9119f9d8436b614954ff88ec82cb58 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/sys/Poller.h"
#include "qpid/sys/Mutex.h"
#include "AsynchIoResult.h"
#include "IoHandlePrivate.h"
#include "check.h"
#include <winsock2.h>
#include <windows.h>
#include <assert.h>
#include <vector>
#include <exception>
namespace qpid {
namespace sys {
class PollerHandlePrivate {
friend class Poller;
friend class PollerHandle;
SOCKET fd;
AsynchIoResult::Completer cb;
PollerHandlePrivate(SOCKET f, AsynchIoResult::Completer cb0 = 0) :
fd(f), cb(cb0)
{
}
};
PollerHandle::PollerHandle(const IOHandle& h) :
impl(new PollerHandlePrivate(toFd(h.impl), h.impl->event))
{}
PollerHandle::~PollerHandle() {
delete impl;
}
/**
* Concrete implementation of Poller to use the Windows I/O Completion
* port (IOCP) facility.
*/
class PollerPrivate {
friend class Poller;
const HANDLE iocp;
// The number of threads running the event loop.
volatile LONG threadsRunning;
// Shutdown request is handled by setting isShutdown and injecting a
// well-formed completion event into the iocp.
bool isShutdown;
PollerPrivate() :
iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)),
threadsRunning(0),
isShutdown(false) {
QPID_WINDOWS_CHECK_NULL(iocp);
}
~PollerPrivate() {
// It's probably okay to ignore any errors here as there can't be
// data loss
::CloseHandle(iocp);
}
};
void Poller::addFd(PollerHandle& handle, Direction dir) {
HANDLE h = (HANDLE)(handle.impl->fd);
if (h != INVALID_HANDLE_VALUE) {
HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0);
QPID_WINDOWS_CHECK_NULL(iocpHandle);
}
else {
AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
}
}
void Poller::shutdown() {
// Allow sloppy code to shut us down more than once.
if (impl->isShutdown)
return;
ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
}
// All no-ops...
void Poller::delFd(PollerHandle& handle) {}
void Poller::modFd(PollerHandle& handle, Direction dir) {}
void Poller::rearmFd(PollerHandle& handle) {}
Poller::Event Poller::wait(Duration timeout) {
DWORD timeoutMs = 0;
DWORD numTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED *overlapped = 0;
AsynchResult *result = 0;
// Wait for either an I/O operation to finish (thus signaling the
// IOCP handle) or a shutdown request to be made (thus signaling the
// shutdown event).
if (timeout == TIME_INFINITE)
timeoutMs = INFINITE;
else
timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC);
InterlockedIncrement(&impl->threadsRunning);
bool goodOp = ::GetQueuedCompletionStatus (impl->iocp,
&numTransferred,
&completionKey,
&overlapped,
timeoutMs);
LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning);
if (goodOp) {
// Dequeued a successful completion. If it's a posted packet from
// shutdown() the overlapped ptr is 0 and key is 1. Else downcast
// the OVERLAPPED pointer to an AsynchIoResult and call the
// completion handler.
if (overlapped == 0 && completionKey == 1) {
// If there are other threads still running this wait, re-post
// the completion.
if (remainingThreads > 0)
PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0);
return Event(0, SHUTDOWN);
}
result = AsynchResult::from_overlapped(overlapped);
result->success (static_cast<size_t>(numTransferred));
}
else {
if (overlapped != 0) {
// Dequeued a completion for a failed operation. Downcast back
// to the result object and inform it that the operation failed.
DWORD status = ::GetLastError();
result = AsynchResult::from_overlapped(overlapped);
result->failure (static_cast<int>(status));
}
}
return Event(0, INVALID); // TODO - this may need to be changed.
}
// Concrete constructors
Poller::Poller() :
impl(new PollerPrivate())
{}
Poller::~Poller() {
delete impl;
}
}}