blob: 453b9df573afe4486946a7c15fdd22d8e0d7c876 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <memory>
#include <stout/lambda.hpp>
#include <stout/try.hpp>
#include <stout/windows.hpp>
#include <stout/os/int_fd.hpp>
#include <stout/os/read.hpp>
#include <stout/os/sendfile.hpp>
#include <stout/os/write.hpp>
#include <stout/windows/error.hpp>
#include <process/address.hpp>
#include <process/network.hpp>
#include <process/process.hpp> // For process::initialize.
#include "windows/libwinio.hpp"
namespace process {
namespace windows {
// Key constants for IOCP `GetQueuedCompletionStatusEx`.
constexpr DWORD KEY_QUIT = 0;
constexpr DWORD KEY_IO = 1;
constexpr DWORD KEY_TIMER = 2;
// Definitions for handling for event loop timers.
struct TimerOverlapped
{
HANDLE timer;
LARGE_INTEGER time;
lambda::function<void()> callback;
};
// This ia a `TimerAPCProc` callback function that is called when the timer
// set by `SetWaitableTimer` fires [1]. Note that `CALLBACK` is defined to be
// `__stdcall` in the Windows SDK, since the Win32 APIs use the stdcall
// calling convention.
//
// [1]: https://msdn.microsoft.com/en-us/library/windows/desktop/ms686786(v=vs.85).aspx // NOLINT(whitespace/line_length)
static void CALLBACK timer_apc(void* arg, DWORD timer_low, DWORD timer_high)
{
TimerOverlapped* timer = reinterpret_cast<TimerOverlapped*>(arg);
timer->callback();
::CloseHandle(timer->timer);
delete timer;
}
// Definitions for handling event loop IO.
enum class IOType
{
READ,
WRITE,
RECV,
SEND,
ACCEPT,
CONNECT,
SENDFILE
};
// Base overlapped struct that contains the required Win32 overlapped object,
// the `HANDLE` where the IO was performed and the type of IO performed.
struct IOOverlappedBase
{
OVERLAPPED overlapped;
HANDLE handle;
IOType type;
};
// We keep `Promise<T>*` instead of `Promises<T>` so that we decouple the
// Promise from the overlapped object. This is because to support cancellation,
// we keep `std::shared_ptr` and `std::weak_ptr` copies of the overlapped
// object pointers in the `Promise` callbacks, so when the callbacks are
// cleaned up, we would also end up cleaning up the Promise itself, which can
// cause memory corruption.
template <typename T>
struct IOOverlapped
{
IOOverlapped(const IOOverlappedBase& _base, Promise<T>* _promise)
: base(_base), promise(_promise)
{}
IOOverlappedBase base;
Promise<T>* promise;
};
using IOOverlappedReadWrite = IOOverlapped<size_t>;
using IOOverlappedConnect = IOOverlapped<Nothing>;
// The async `Accept` is like `Connect` but with some additional buffers.
// We can't use inheritance here because `IOOverlappedAccept` will not
// be a C++ standard layout type, meaning that treating it like a C
// struct in the IOCP code can lead to undefined behavior.
struct IOOverlappedAccept
{
IOOverlappedAccept(const IOOverlappedBase& _base, Promise<Nothing>* _promise)
: base(_base), promise(_promise)
{}
IOOverlappedBase base;
Promise<Nothing>* promise;
// `AcceptEx` needs a buffer size of atleast "16 bytes more than the size of
// the sockaddr structure for the transport protocol" [1] for the remote and
// local addresses.
// [1]: https://msdn.microsoft.com/en-us/library/windows/desktop/ms737524(v=vs.85).aspx // NOLINT(whitespace/line_length)
unsigned char buf[2 * sizeof(SOCKADDR_STORAGE) + 32];
};
// The IOCP Win32 APIs use C structs and involve a lot of type unsafe casting,
// so we ensure that these C++ structs can be safely used as C structs.
static_assert(
std::is_standard_layout<IOOverlappedReadWrite>::value,
"IOOverlappedReadWrite must be a standard layout type");
static_assert(
std::is_standard_layout<IOOverlappedConnect>::value,
"IOOverlappedConnect must be a standard layout type");
static_assert(
std::is_standard_layout<IOOverlappedAccept>::value,
"IOOverlappedAccept must be a standard layout type");
template <typename T>
static void set_io_promise(Promise<T>* promise, const T& data, DWORD error)
{
// If our discard induced CancelIoEx call succeeded, then we
// will see ERROR_OPERATION_ABORTED. Otherwise, the discard
// lost the race against the operation completing and we
// should just surface the result.
if (promise->future().hasDiscard() && error == ERROR_OPERATION_ABORTED) {
promise->discard();
} else if (error == ERROR_SUCCESS) {
promise->set(data);
} else {
promise->fail("IO failed with error code: " + WindowsError(error).message);
}
}
static void handle_io(OVERLAPPED* overlapped_, DWORD bytes_transferred)
{
// Overlapped objects to passed to this function should have been contained
// inside a `IOOverlappedBase`. So, we can use the `CONTAINING_RECORD` macro
// to get the pointer to the `IOOverlappedBase` struct from the `OVERLAPPED`
// pointer.
IOOverlappedBase* overlapped_base =
CONTAINING_RECORD(overlapped_, IOOverlappedBase, overlapped);
// Get the Win32 error code of the overlapped operation. The status code
// is actually in overlapped->overlapped->Internal, but it's the NT
// status code instead of the Win32 error.
DWORD error = ERROR_SUCCESS;
DWORD bytes;
const BOOL success = ::GetOverlappedResult(
overlapped_base->handle, &overlapped_base->overlapped, &bytes, FALSE);
if (!success) {
error = ::GetLastError();
} else {
// If the IO succeeded, these should be the same for sure.
CHECK_EQ(bytes, bytes_transferred);
}
switch (overlapped_base->type) {
case IOType::READ:
case IOType::RECV: {
IOOverlappedReadWrite* io_read =
CONTAINING_RECORD(overlapped_base, IOOverlappedReadWrite, base);
std::unique_ptr<Promise<size_t>> promise(io_read->promise);
// For reads, we need to make sure we ignore the EOF errors.
if (error == ERROR_BROKEN_PIPE || error == ERROR_HANDLE_EOF) {
set_io_promise(promise.get(), static_cast<size_t>(0), ERROR_SUCCESS);
} else {
set_io_promise(
promise.get(), static_cast<size_t>(bytes_transferred), error);
}
return;
}
case IOType::WRITE:
case IOType::SEND:
case IOType::SENDFILE: {
IOOverlappedReadWrite* io_write =
CONTAINING_RECORD(overlapped_base, IOOverlappedReadWrite, base);
std::unique_ptr<Promise<size_t>> promise(io_write->promise);
set_io_promise(
promise.get(), static_cast<size_t>(bytes_transferred), error);
return;
}
case IOType::CONNECT: {
IOOverlappedConnect* io_connect =
CONTAINING_RECORD(overlapped_base, IOOverlappedConnect, base);
std::unique_ptr<Promise<Nothing>> promise(io_connect->promise);
set_io_promise(promise.get(), Nothing(), error);
return;
}
case IOType::ACCEPT: {
IOOverlappedAccept* io_accept =
CONTAINING_RECORD(overlapped_base, IOOverlappedAccept, base);
std::unique_ptr<Promise<Nothing>> promise(io_accept->promise);
set_io_promise(promise.get(), Nothing(), error);
return;
}
}
UNREACHABLE();
}
// Function to handle all IOCP notifications. Returns true if the IOCP loop
// should continue and false if it should stop.
static bool check_and_handle_completion(const OVERLAPPED_ENTRY& entry)
{
switch (entry.lpCompletionKey) {
case KEY_QUIT: {
return false;
}
case KEY_TIMER: {
// In the IOCP, we just set the timer. When the timer completes, it will
// queue up an APC that will interrupt the IOCP loop in order to execute
// the APC callback.
TimerOverlapped* timer =
reinterpret_cast<TimerOverlapped*>(entry.lpOverlapped);
const BOOL success = ::SetWaitableTimer(
timer->timer, &timer->time, 0, &timer_apc, timer, TRUE);
if (!success) {
// Nothing we can really do here aside from log the event.
std::string errorMsg = WindowsError().message;
LOG(FATAL)
<< "process::windows::handle_completion failed to set timer: "
<< errorMsg;
}
return true;
}
case KEY_IO: {
if (entry.lpOverlapped == nullptr) {
// Don't know if this is possible, but just log it in case.
LOG(FATAL) << "process::windows::handle_completion returned with a null"
<< "overlapped object";
} else {
handle_io(entry.lpOverlapped, entry.dwNumberOfBytesTransferred);
}
return true;
}
}
UNREACHABLE();
}
// Windows Event loop implementation.
Try<EventLoop*> EventLoop::create()
{
HANDLE iocp_handle =
::CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 1);
if (iocp_handle == nullptr) {
return WindowsError();
}
return new EventLoop(iocp_handle);
}
EventLoop::EventLoop(HANDLE iocp_handle)
: iocp_handle_(std::unique_ptr<HANDLE, HandleDeleter>(iocp_handle))
{}
Try<Nothing> EventLoop::run()
{
const Try<long> maxEntries = os::cpus();
if (maxEntries.isError()) {
return Error(maxEntries.error());
}
bool loop = true;
std::vector<OVERLAPPED_ENTRY> entries(maxEntries.get());
while (loop) {
ULONG dequeued_entries;
// This function can return in three ways:
// 1) We get some IO completion events and the function will return true.
// 2) A timer APC interrupts the function in its alertable wait status,
// so the thread will execute the APC. The function will return false
// and set the error to `WAIT_IO_COMPLETION`.
// 3) We get a legitimate error, so we exit early.
BOOL success = ::GetQueuedCompletionStatusEx(
iocp_handle_.get(),
entries.data(),
maxEntries.get(),
&dequeued_entries,
INFINITE,
TRUE);
if (!success) {
// Case 2: Got APC interrupt. We simply continue the loop.
if (::GetLastError() == WAIT_IO_COMPLETION) {
continue;
}
// We hit case 3, which means we got a serious error.
return WindowsError();
}
// Case 1: Dequeue completion packets and process them. If we get a quit
// notification, then we will finish the current queue and then exit.
for (ULONG i = 0; i < dequeued_entries; i++) {
const bool continue_loop = check_and_handle_completion(entries[i]);
loop = loop && continue_loop;
}
}
return Nothing();
}
Try<Nothing> EventLoop::stop()
{
const BOOL success =
::PostQueuedCompletionStatus(iocp_handle_.get(), 0, KEY_QUIT, nullptr);
if (success) {
return Nothing();
}
return WindowsError();
}
Try<Nothing> EventLoop::launchTimer(
const Duration& duration, const lambda::function<void()>& callback)
{
// Create a non-inheritable, manual reset, unnamed timer.
HANDLE timer = ::CreateWaitableTimerW(nullptr, true, nullptr);
if (timer == nullptr) {
return WindowsError();
}
// If you give a positive value, then the timer call interprets it as
// absolute time. A negative value is interpretted as relative time.
// 0 is run immediately. The resolution is in 100ns.
LARGE_INTEGER time_elapsed;
time_elapsed.QuadPart = -duration.ns() / 100;
TimerOverlapped* overlapped =
new TimerOverlapped{timer, time_elapsed, callback};
// We don't actually set the timer here since APCs only execute in the same
// thread that called the async function. So, we queue the function call to
// the IOCP so the event loop thread can queue the APC.
//
// NOTE: `::PostQueuedCompletionStatus` does not process the second to fourth
// arguments, so you can give anything for them. Specifically, the overlapped
// parameter doesn't need to point to a `OVERLAPPED` structure. See
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365458(v=vs.85).aspx // NOLINT(whitespace/line_length)
const BOOL success = ::PostQueuedCompletionStatus(
iocp_handle_.get(),
0,
KEY_TIMER,
reinterpret_cast<OVERLAPPED*>(overlapped));
if (!success) {
// Failing `PostQueuedCompletionStatus` means we have to clean the
// memory here, since the APC won't execute.
WindowsError error;
delete overlapped;
::CloseHandle(timer);
return error;
}
// The APC callback will clean up the memory and handle, so we can return.
return Nothing();
}
// NOTE: The following functions use `int_fd` instead of the native Win32
// `HANDLE`, because they are the more "public" libwinio APIs that interface
// directly with the libprocess IO, socket and event loop code.
Try<Nothing> EventLoop::registerHandle(const int_fd& fd)
{
Try<HANDLE> assigned_handle = fd.assign_iocp(iocp_handle_.get(), KEY_IO);
if (assigned_handle.isError()) {
return Error(assigned_handle.error());
}
// In this case, the IOCP handle was already assigned. So, we check if the
// right one is assigned.
if (assigned_handle.get() != nullptr) {
if (assigned_handle.get() != iocp_handle_.get()) {
return Error(
"fd is already registered to a different Windows Event Loop");
}
return Nothing();
}
// This is the first time the handle was assigned. We continue the
// initialization. These are some optimizations that prevent IOCP
// notifications on success and event notifications, so that we have less
// context switches.
BOOL success = ::SetFileCompletionNotificationModes(
fd, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE);
if (!success) {
return WindowsError();
}
return Nothing();
}
template <typename T, typename O>
static void enable_cancellation(
const int_fd& fd,
const Future<T>& future,
const std::shared_ptr<O>& overlapped)
{
// We capture a `std::weak_ptr` in the `.onDiscard` callback, so that
// we can cancel the IO operation with the overlapped object if it still
// exists. The captured `shared_ptr` in the `.onAny` callback ensures that
// we keep the overlapped object alive until the IOCP callbacks are done.
auto overlapped_weak = std::weak_ptr<O>(overlapped);
future.onDiscard([fd, overlapped_weak]() {
std::shared_ptr<O> cancel = overlapped_weak.lock();
if (static_cast<bool>(cancel)) {
// We were able to get a reference to the overlapped object, so let's
// try to cancel the IO operation. Note that there is technically a
// race here. We could be between the IO completion event and deleting
// the overlapped object. In that case, this function will just no-op.
::CancelIoEx(fd, &cancel->base.overlapped);
}
});
future.onAny([overlapped]() {});
}
static Future<size_t> read_internal(
const int_fd& fd, void* buf, size_t size, IOType type)
{
process::initialize();
Promise<size_t>* promise = new Promise<size_t>();
Future<size_t> future = promise->future();
// We use a `std::shared_ptr`, so we can safely support canceling.
auto overlapped = std::make_shared<IOOverlappedReadWrite>(
IOOverlappedBase{OVERLAPPED{}, fd, type}, promise);
enable_cancellation(fd, future, overlapped);
// Start the asynchronous operation.
const Result<size_t> result =
os::read_async(fd, buf, size, &overlapped->base.overlapped);
// If the request is pending, then we return immediately and have the
// callback free the promise and overlapped.
if (result.isNone()) {
return future;
}
// In an error or immediate success, we have to manually set the promise
// and free it.
if (result.isError()) {
promise->fail("os::read_async failed: " + result.error());
} else if (result.isSome()) {
promise->set(result.get());
}
delete promise;
return future;
}
static Future<size_t> write_internal(
const int_fd& fd, const void* buf, size_t size, IOType type)
{
process::initialize();
Promise<size_t>* promise = new Promise<size_t>();
Future<size_t> future = promise->future();
// We use a `std::shared_ptr`, so we can safely support canceling.
auto overlapped = std::make_shared<IOOverlappedReadWrite>(
IOOverlappedBase{OVERLAPPED{}, fd, type}, promise);
enable_cancellation(fd, future, overlapped);
// Start the asynchronous operation.
const Result<size_t> result =
os::write_async(fd, buf, size, &overlapped->base.overlapped);
// If the request is pending, then we return immediately and have the
// callback free the promise and overlapped.
if (result.isNone()) {
return future;
}
// In an error or immediate success, we have to manually set the promise
// and free it.
if (result.isError()) {
promise->fail("os::write_async failed: " + result.error());
} else if (result.isSome()) {
promise->set(result.get());
}
delete promise;
return future;
}
Future<size_t> read(const int_fd& fd, void* buf, size_t size)
{
return read_internal(fd, buf, size, IOType::READ);
}
Future<size_t> write(const int_fd& fd, const void* buf, size_t size)
{
return write_internal(fd, buf, size, IOType::WRITE);
}
Future<size_t> recv(const int_fd& fd, void* buf, size_t size)
{
return read_internal(fd, buf, size, IOType::RECV);
}
Future<size_t> send(const int_fd& fd, const void* buf, size_t size)
{
return write_internal(fd, buf, size, IOType::SEND);
}
Future<Nothing> accept(const int_fd& fd, const int_fd& accepted_socket)
{
process::initialize();
Promise<Nothing>* promise = new Promise<Nothing>();
Future<Nothing> future = promise->future();
// We use a `std::shared_ptr`, so we can safely support canceling.
auto overlapped = std::make_shared<IOOverlappedAccept>(
IOOverlappedBase{OVERLAPPED{}, fd, IOType::ACCEPT}, promise);
enable_cancellation(fd, future, overlapped);
// The `overlapped->buf` passed into `::AcceptEx` will receive the first
// data block sent, the local address of the server and the remote address
// of the client. The (4th, 5th, 6th) arguments are
// (0, sizeof(buf)/2 , sizeof(buf) / 2), since we ignore the first block,
// and simply store the local and remote addresses. For more details, see
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms737524(v=vs.85).aspx // NOLINT(whitespace/line_length)
DWORD bytes;
const BOOL success = ::AcceptEx(
fd,
accepted_socket,
overlapped->buf,
0,
sizeof(overlapped->buf) / 2,
sizeof(overlapped->buf) / 2,
&bytes,
&overlapped->base.overlapped);
const DWORD error = ::WSAGetLastError();
// If the request is pending, then we return immediately and have the
// callback free the promise and overlapped
if (!success && error == WSA_IO_PENDING) {
return future;
}
// In an error or immediate success, we have to manually set the promise
// and free it.
if (success) {
promise->set(Nothing());
} else {
promise->fail("AcceptEx failed: " + WindowsError(error).message);
}
delete promise;
return future;
}
// The MSDN docs state that `::ConnectEx` must be retrieved through
// `::WSAIoctl`. See the remarks section of the docs:
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms737606(v=vs.85).aspx // NOLINT(whitespace/line_length)
static LPFN_CONNECTEX init_connect_ex(const int_fd& fd)
{
LPFN_CONNECTEX connect_ex;
GUID connect_ex_guid = WSAID_CONNECTEX;
DWORD bytes;
int res = ::WSAIoctl(
fd,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&connect_ex_guid,
sizeof(connect_ex_guid),
&connect_ex,
sizeof(connect_ex),
&bytes,
nullptr,
nullptr);
// We can't use connect if we failed to get the function, so we just force
// an abort.
CHECK_EQ(res, 0);
return connect_ex;
}
static LPFN_CONNECTEX& get_connect_ex_ptr(const int_fd& fd)
{
// C++11 magic static initialization.
static LPFN_CONNECTEX ptr = init_connect_ex(fd);
return ptr;
}
Future<Nothing> connect(const int_fd& fd, const network::Address& address)
{
process::initialize();
// `::ConnectEx` needs the socket to be bound first.
Try<Nothing> bind_result = Nothing();
if (address.family() == network::Address::Family::INET4) {
const network::inet4::Address addr = network::inet4::Address::ANY_ANY();
bind_result = network::bind(fd, addr);
} else if (address.family() == network::Address::Family::INET6) {
const network::inet6::Address addr = network::inet6::Address::ANY_ANY();
bind_result = network::bind(fd, addr);
} else {
return Failure("Async connect only supports IPv6 and IPv4");
}
if (bind_result.isError()) {
// `WSAEINVAL` means socket is already bound, so we can continue. If it was
// bound incorrectly, then we can get an error later on.
if (::WSAGetLastError() != WSAEINVAL) {
return Failure("Failed to bind connect socket: " + bind_result.error());
}
}
// Load `::ConnectEx` function pointer, since it's not normally available.
const sockaddr_storage storage = address;
const int address_size = static_cast<int>(address.size());
LPFN_CONNECTEX connect_ex = get_connect_ex_ptr(fd);
Promise<Nothing>* promise = new Promise<Nothing>();
Future<Nothing> future = promise->future();
auto overlapped = std::make_shared<IOOverlappedConnect>(
IOOverlappedBase{OVERLAPPED{}, fd, IOType::CONNECT}, promise);
enable_cancellation(fd, future, overlapped);
const BOOL success = connect_ex(
fd,
reinterpret_cast<const sockaddr*>(&storage),
address_size,
nullptr,
0,
nullptr,
&overlapped->base.overlapped);
const DWORD error = ::WSAGetLastError();
// If the request is pending, then we return immediately and have the
// callback free the promise and overlapped
if (!success && error == WSA_IO_PENDING) {
return future;
}
// In an error or immediate success, we have to manually set the promise
// and free it.
if (success) {
promise->set(Nothing());
} else {
promise->fail("AcceptEx failed: " + stringify(error));
}
delete promise;
return future;
}
Future<size_t> sendfile(
const int_fd& socket, const int_fd& file, off_t offset, size_t size)
{
process::initialize();
if (offset < 0) {
return Failure("process::windows::sendfile got negative offset");
}
Promise<size_t>* promise = new Promise<size_t>();
Future<size_t> future = promise->future();
auto overlapped = std::make_shared<IOOverlappedReadWrite>(
IOOverlappedBase{OVERLAPPED{}, socket, IOType::SENDFILE}, promise);
uint64_t offset64 = static_cast<uint64_t>(offset);
overlapped->base.overlapped.Offset = static_cast<DWORD>(offset64);
overlapped->base.overlapped.OffsetHigh = static_cast<DWORD>(offset64 >> 32);
enable_cancellation(socket, future, overlapped);
const Result<size_t> result =
os::sendfile_async(socket, file, size, &overlapped->base.overlapped);
// If the request is pending, then we return immediately and have the
// callback free the promise and overlapped.
if (result.isNone()) {
return future;
}
// In an error or immediate success, we have to manually set the promise
// and free it.
if (result.isError()) {
promise->fail("os::sendfile_async failed: " + result.error());
} else if (result.isSome()) {
promise->set(result.get());
}
delete promise;
return future;
}
} // namespace windows {
} // namespace process {