blob: 51512bd7bba8185bfdec9edd157378bced47f658 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/condition.h>
#include <proton/connection_driver.h>
#include <proton/engine.h>
#include <proton/message.h>
#include <proton/netaddr.h>
#include <proton/object.h>
#include <proton/proactor.h>
#include <proton/transport.h>
#include <proton/listener.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>
#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <list>
#include <queue>
#include <sstream>
#include <winsock2.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include "netaddr-internal.h" /* Include after socket/inet headers */
#include "core/logger_private.h"
/*
* Proactor for Windows using IO completion ports.
*
* There is no specific threading code other than locking and indirect (and
* brief) calls to the default threadpool via timerqueue timers.
*
* The IOCP system remembers what threads have called pn_proactor_wait and
* assumes they are doing proactor work until they call wait again. Using a
* dedicated threadpool to drive the proactor, as in the examples, is
* recommended. The use of other long-lived threads to occasionally drive the
* proactor will hurt the completion port scheduler and cause fewer threads to
* run.
*
* Each proactor connection maintains its own queue of pending completions and
* its work is serialized on that. The proactor listener runs parallel where
* possible, but its event batch is serialized.
*/
// TODO: make all code C++ or all C90-ish
// change INACTIVE to be from begin_close instead of zombie reap, to be more like Posix
// make the global write lock window much smaller
// 2 exclusive write buffers per connection
// make the zombie processing thread safe
// configurable completion port concurrency
// proton objects and ref counting: just say no.
// From selector.h
#define PN_READABLE (1)
#define PN_WRITABLE (2)
#define PN_EXPIRED (4)
#define PN_ERROR (8)
typedef SOCKET pn_socket_t;
namespace pn_experimental {
// Code borrowed from old messenger/reactor io.c iocp.c write_pipeline.c select.c
// Mostly unchanged except fro some thread safety (i.e. dropping the old descriptor map)
typedef struct pni_acceptor_t pni_acceptor_t;
typedef struct write_result_t write_result_t;
typedef struct read_result_t read_result_t;
typedef struct write_pipeline_t write_pipeline_t;
typedef struct iocpdesc_t iocpdesc_t;
typedef struct iocp_t iocp_t;
// One per completion port.
struct iocp_t {
HANDLE completion_port;
pn_list_t *zombie_list;
unsigned shared_pool_size;
char *shared_pool_memory;
write_result_t **shared_results;
write_result_t **available_results;
size_t shared_available_count;
size_t writer_count;
int loopback_bufsize;
bool iocp_trace;
};
// One for each socket.
// It should remain ref counted in the
// zombie_list until ops_in_progress == 0 or the completion port is closed.
struct iocpdesc_t {
pn_socket_t socket;
iocp_t *iocp;
void *active_completer;
pni_acceptor_t *acceptor;
pn_error_t *error;
int ops_in_progress;
bool read_in_progress;
write_pipeline_t *pipeline;
read_result_t *read_result;
bool bound; // associated with the completion port
bool closing; // close called by application
bool read_closed; // EOF or read error
bool write_closed; // shutdown sent or write error
bool poll_error; // flag posix-like POLLERR/POLLHUP/POLLNVAL
bool is_mp; // Special multi thread care required
bool write_blocked;
int events;
pn_timestamp_t reap_time;
};
// Max number of overlapped accepts per listener. TODO: configurable.
#define IOCP_MAX_ACCEPTS 4
// AcceptEx squishes the local and remote addresses and optional data
// all together when accepting the connection. Reserve enough for
// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
// per address is required by AcceptEx.
#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
typedef struct {
OVERLAPPED overlapped;
iocp_type_t type;
iocpdesc_t *iocpd;
HRESULT status;
DWORD num_transferred;
} iocp_result_t;
// Completion keys to distinguish IO from some other user port completions
enum {
proactor_io = 0,
proactor_wake_key,
psocket_wakeup_key,
recycle_accept_key,
};
struct read_result_t {
iocp_result_t base;
size_t drain_count;
char unused_buf[1];
};
struct write_result_t {
iocp_result_t base;
size_t requested;
bool in_use;
pn_bytes_t buffer;
};
typedef struct {
iocp_result_t base;
char address_buffer[IOCP_SOCKADDRBUFLEN];
struct addrinfo *addrinfo;
} connect_result_t;
typedef struct {
iocp_result_t base;
iocpdesc_t *new_sock;
char address_buffer[IOCP_SOCKADDRBUFLEN];
DWORD unused;
} accept_result_t;
void complete_connect(connect_result_t *result, HRESULT status);
void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status);
void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status);
void start_reading(iocpdesc_t *iocpd);
void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result);
void reset_accept_result(accept_result_t *result);
iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd);
void pni_iocp_reap_check(iocpdesc_t *iocpd);
connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr);
iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s);
iocpdesc_t *pni_deadline_desc(iocp_t *);
void pni_iocpdesc_start(iocpdesc_t *iocpd);
void pni_iocp_drain_completions(iocp_t *);
int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
void pni_iocp_start_accepting(iocpdesc_t *iocpd);
pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
void pni_iocp_begin_close(iocpdesc_t *iocpd);
iocp_t *pni_iocp();
void pni_events_update(iocpdesc_t *iocpd, int events);
write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
void pni_zombie_check(iocp_t *, pn_timestamp_t);
pn_timestamp_t pni_zombie_deadline(iocp_t *);
int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
}
// ======================================================================
// Write pipeline.
// ======================================================================
/*
* A simple write buffer pool. Each socket has a dedicated "primary"
* buffer and can borrow from a shared pool with limited size tuning.
* Could enhance e.g. with separate pools per network interface and fancier
* memory tuning based on interface speed, system resources, and
* number of connections, etc.
*/
namespace pn_experimental {
// Max overlapped writes per socket
#define IOCP_MAX_OWRITES 16
// Write buffer size
#define IOCP_WBUFSIZE 16384
static void pipeline_log(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fflush(stderr);
}
void pni_shared_pool_create(iocp_t *iocp)
{
// TODO: more pools (or larger one) when using multiple non-loopback interfaces
iocp->shared_pool_size = 16;
char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
if (env) {
int sz = atoi(env);
if (sz >= 0 && sz < 256) {
iocp->shared_pool_size = sz;
}
}
iocp->loopback_bufsize = 0;
env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
if (env) {
int sz = atoi(env);
if (sz >= 0 && sz <= 128 * 1024) {
iocp->loopback_bufsize = sz;
}
}
if (iocp->shared_pool_size) {
iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
HRESULT status = GetLastError();
if (!iocp->shared_pool_memory) {
perror("Proton write buffer pool allocation failure\n");
iocp->shared_pool_size = 0;
iocp->shared_available_count = 0;
return;
}
iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
iocp->shared_available_count = iocp->shared_pool_size;
char *mem = iocp->shared_pool_memory;
for (unsigned i = 0; i < iocp->shared_pool_size; i++) {
iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
mem += IOCP_WBUFSIZE;
}
}
}
void pni_shared_pool_free(iocp_t *iocp)
{
for (unsigned i = 0; i < iocp->shared_pool_size; i++) {
write_result_t *result = iocp->shared_results[i];
if (result->in_use)
pipeline_log("Proton buffer pool leak\n");
else
free(result);
}
if (iocp->shared_pool_size) {
free(iocp->shared_results);
free(iocp->available_results);
if (iocp->shared_pool_memory) {
if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
perror("write buffers release failed");
}
iocp->shared_pool_memory = NULL;
}
}
}
static void shared_pool_push(write_result_t *result)
{
iocp_t *iocp = result->base.iocpd->iocp;
assert(iocp->shared_available_count < iocp->shared_pool_size);
iocp->available_results[iocp->shared_available_count++] = result;
}
static write_result_t *shared_pool_pop(iocp_t *iocp)
{
return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
}
struct write_pipeline_t {
iocpdesc_t *iocpd;
size_t pending_count;
write_result_t *primary;
size_t reserved_count;
size_t next_primary_index;
size_t depth;
bool is_writer;
};
#define write_pipeline_compare NULL
#define write_pipeline_inspect NULL
#define write_pipeline_hashcode NULL
static void write_pipeline_initialize(void *object)
{
write_pipeline_t *pl = (write_pipeline_t *) object;
pl->pending_count = 0;
const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
pl->depth = 0;
pl->is_writer = false;
}
static void write_pipeline_finalize(void *object)
{
write_pipeline_t *pl = (write_pipeline_t *) object;
free((void *)pl->primary->buffer.start);
free(pl->primary);
}
write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
{
static const pn_cid_t CID_write_pipeline = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(write_pipeline);
write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t));
pipeline->iocpd = iocpd;
pipeline->primary->base.iocpd = iocpd;
return pipeline;
}
static void confirm_as_writer(write_pipeline_t *pl)
{
if (!pl->is_writer) {
iocp_t *iocp = pl->iocpd->iocp;
iocp->writer_count++;
pl->is_writer = true;
}
}
static void remove_as_writer(write_pipeline_t *pl)
{
if (!pl->is_writer)
return;
iocp_t *iocp = pl->iocpd->iocp;
assert(iocp->writer_count);
pl->is_writer = false;
iocp->writer_count--;
}
/*
* Optimal depth will depend on properties of the NIC, server, and driver. For now,
* just distinguish between loopback interfaces and the rest. Optimizations in the
* loopback stack allow decent performance with depth 1 and actually cause major
* performance hiccups if set to large values.
*/
static void set_depth(write_pipeline_t *pl)
{
pl->depth = 1;
sockaddr_storage sa;
socklen_t salen = sizeof(sa);
char buf[INET6_ADDRSTRLEN];
DWORD buflen = sizeof(buf);
if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
(sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
// not loopback
pl->depth = IOCP_MAX_OWRITES;
} else {
iocp_t *iocp = pl->iocpd->iocp;
if (iocp->loopback_bufsize) {
const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
if (p) {
pl->primary->buffer.start = p;
pl->primary->buffer.size = iocp->loopback_bufsize;
}
}
}
}
}
static size_t pni_min(size_t a, size_t b) { return a < b ? a : b; }
// Reserve as many buffers as possible for count bytes.
size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
{
if (pl->primary->in_use)
return 0; // I.e. io->wouldblock
if (!pl->depth)
set_depth(pl);
if (pl->depth == 1) {
// always use the primary
pl->reserved_count = 1;
pl->next_primary_index = 0;
return 1;
}
iocp_t *iocp = pl->iocpd->iocp;
confirm_as_writer(pl);
size_t wanted = (count / IOCP_WBUFSIZE);
if (count % IOCP_WBUFSIZE)
wanted++;
size_t pending = pl->pending_count;
assert(pending < pl->depth);
size_t bufs = pni_min(wanted, pl->depth - pending);
// Can draw from shared pool or the primary... but share with others.
size_t writers = iocp->writer_count;
size_t shared_count = (iocp->shared_available_count + writers - 1) / writers;
bufs = pni_min(bufs, shared_count + 1);
pl->reserved_count = pending + bufs;
if (bufs == wanted &&
pl->reserved_count < (pl->depth / 2) &&
iocp->shared_available_count > (2 * writers + bufs)) {
// No shortage: keep the primary as spare for future use
pl->next_primary_index = pl->reserved_count;
} else if (bufs == 1) {
pl->next_primary_index = pending;
} else {
// let approx 1/3 drain before replenishing
pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
if (pl->next_primary_index < pending)
pl->next_primary_index = pending;
}
return bufs;
}
write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
{
size_t sz = pl->pending_count;
if (sz >= pl->reserved_count)
return NULL;
write_result_t *result;
if (sz == pl->next_primary_index) {
result = pl->primary;
} else {
assert(pl->iocpd->iocp->shared_available_count > 0);
result = shared_pool_pop(pl->iocpd->iocp);
}
result->in_use = true;
pl->pending_count++;
return result;
}
void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
{
result->in_use = false;
pl->pending_count--;
pl->reserved_count = 0;
if (result != pl->primary)
shared_pool_push(result);
if (pl->pending_count == 0)
remove_as_writer(pl);
}
bool pni_write_pipeline_writable(write_pipeline_t *pl)
{
// Only writable if not full and we can guarantee a buffer:
return pl->pending_count < pl->depth && !pl->primary->in_use;
}
size_t pni_write_pipeline_size(write_pipeline_t *pl)
{
return pl->pending_count;
}
}
// ======================================================================
// IOCP infrastructure code
// ======================================================================
/*
* Socket IO including graceful or forced zombie teardown.
*
* Overlapped writes are used to avoid lengthy stalls between write
* completion and starting a new write. Non-overlapped reads are used
* since Windows accumulates inbound traffic without stalling and
* managing read buffers would not avoid a memory copy at the pn_read
* boundary.
*
* A socket must not get a Windows closesocket() unless the
* application has called pn_connection_close on the connection or a
* global single threaded shutdown is in progress. On error, the
* internal accounting for write_closed or read_closed may be updated
* along with the external event notification. A socket may be
* directly closed if it is never gets started (accept/listener_close
* collision) or otherwise turned into a zombie.
*/
namespace pn_experimental {
static void iocp_log(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fflush(stderr);
}
static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
{
char buf[512];
if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
0, status, 0, buf, sizeof(buf), 0))
pn_error_set(error, code, buf);
else {
fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
}
}
static void reap_check(iocpdesc_t *);
static void bind_to_completion_port(iocpdesc_t *iocpd);
static void iocp_shutdown(iocpdesc_t *iocpd);
static bool is_listener(iocpdesc_t *iocpd);
static void release_sys_sendbuf(SOCKET s);
int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
{
// Error code can be from GetLastError or WSAGetLastError,
char err[1024] = {0};
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
}
static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
{
pni_win32_error(iocpd->error, text, status);
if (iocpd->iocp->iocp_trace) {
iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
}
iocpd->write_closed = true;
iocpd->read_closed = true;
iocpd->poll_error = true;
pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE));
}
static int pni_strcasecmp(const char *a, const char *b)
{
int diff;
while (*b) {
char aa = *a++, bb = *b++;
diff = tolower(aa)-tolower(bb);
if ( diff!=0 ) return diff;
}
return *a;
}
static void pni_events_update(iocpdesc_t *iocpd, int events)
{
// If set, a poll error is permanent
if (iocpd->poll_error)
events |= PN_ERROR;
if (iocpd->events == events)
return;
// ditch old code to update list of ready selectables
iocpd->events = events;
}
// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
{
GUID guid = WSAID_ACCEPTEX;
DWORD bytes = 0;
LPFN_ACCEPTEX fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
{
GUID guid = WSAID_CONNECTEX;
DWORD bytes = 0;
LPFN_CONNECTEX fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
{
GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes = 0;
LPFN_GETACCEPTEXSOCKADDRS fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
// match accept socket to listener socket
iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
{
sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
return NULL;
SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
if (s == INVALID_SOCKET)
return NULL;
return pni_iocpdesc_create(iocpd->iocp, s);
}
static bool is_listener(iocpdesc_t *iocpd)
{
return iocpd && iocpd->acceptor;
}
// === Async accept processing
static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t));
if (result) {
result->base.type = IOCP_ACCEPT;
result->base.iocpd = listen_sock;
}
return result;
}
void reset_accept_result(accept_result_t *result) {
memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
result->new_sock = NULL;
}
struct pni_acceptor_t {
int accept_queue_size;
pn_list_t *accepts;
iocpdesc_t *listen_sock;
bool signalled;
LPFN_ACCEPTEX fn_accept_ex;
LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
};
#define pni_acceptor_compare NULL
#define pni_acceptor_inspect NULL
#define pni_acceptor_hashcode NULL
static void pni_acceptor_initialize(void *object)
{
pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS);
}
static void pni_acceptor_finalize(void *object)
{
pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
size_t len = pn_list_size(acceptor->accepts);
for (size_t i = 0; i < len; i++)
free(pn_list_get(acceptor->accepts, i));
pn_free(acceptor->accepts);
}
static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
{
static const pn_cid_t CID_pni_acceptor = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(pni_acceptor);
pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t));
acceptor->listen_sock = iocpd;
acceptor->accept_queue_size = 0;
acceptor->signalled = false;
pn_socket_t sock = acceptor->listen_sock->socket;
acceptor->fn_accept_ex = lookup_accept_ex(sock);
acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
return acceptor;
}
void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
{
// flag to divide this routine's logic into locked/unlocked mp portions
bool mp = acceptor->listen_sock->is_mp;
bool created = false;
if (acceptor->listen_sock->closing) {
if (result) {
if (mp && result->new_sock && result->new_sock->socket != INVALID_SOCKET)
closesocket(result->new_sock->socket);
free(result);
acceptor->accept_queue_size--;
}
if (acceptor->accept_queue_size == 0)
acceptor->signalled = true;
return;
}
if (result) {
if (!mp)
reset_accept_result(result);
} else {
if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && (mp ||
pn_list_size(acceptor->accepts) == acceptor->accept_queue_size )) {
result = accept_result(acceptor->listen_sock);
acceptor->accept_queue_size++;
created = true;
} else {
// an async accept is still pending or max concurrent accepts already hit
return;
}
}
if (created || !mp)
result->new_sock = create_same_type_socket(acceptor->listen_sock);
if (result->new_sock) {
// Not yet connected.
result->new_sock->read_closed = true;
result->new_sock->write_closed = true;
bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
&result->unused, (LPOVERLAPPED) result);
if (!success) {
DWORD err = WSAGetLastError();
if (err != ERROR_IO_PENDING) {
if (err == WSAECONNRESET) {
// other side gave up. Ignore and try again.
begin_accept(acceptor, result);
return;
}
else {
iocpdesc_fail(acceptor->listen_sock, err, "AcceptEX call failure");
return;
}
}
acceptor->listen_sock->ops_in_progress++;
// This socket is equally involved in the async operation.
result->new_sock->ops_in_progress++;
}
} else {
iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
}
}
static void complete_accept(accept_result_t *result, HRESULT status)
{
result->new_sock->ops_in_progress--;
iocpdesc_t *ld = result->base.iocpd;
if (ld->read_closed) {
if (!result->new_sock->closing)
pni_iocp_begin_close(result->new_sock);
pn_decref(result->new_sock);
free(result); // discard
reap_check(ld);
} else {
assert(!ld->is_mp); // Non mp only
result->base.status = status;
pn_list_add(ld->acceptor->accepts, result);
pni_events_update(ld, ld->events | PN_READABLE);
}
}
// === Async connect processing
#define connect_result_initialize NULL
#define connect_result_compare NULL
#define connect_result_inspect NULL
#define connect_result_hashcode NULL
static void connect_result_finalize(void *object)
{
connect_result_t *result = (connect_result_t *) object;
// Do not release addrinfo until ConnectEx completes
if (result->addrinfo)
freeaddrinfo(result->addrinfo);
}
connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
static const pn_cid_t CID_connect_result = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(connect_result);
connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t));
if (result) {
memset(result, 0, sizeof(connect_result_t));
result->base.type = IOCP_CONNECT;
result->base.iocpd = iocpd;
result->addrinfo = addr;
}
return result;
}
pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
{
// addr lives for the duration of the async connect. Caller has passed ownership here.
// See connect_result_finalize().
// Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
sockaddr_storage sa;
memset(&sa, 0, sizeof(sa));
sa.ss_family = addr->ai_family;
if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
pni_win32_error(error, "begin async connection", WSAGetLastError());
if (iocp->iocp_trace)
iocp_log("%s\n", pn_error_text(error));
closesocket(sock);
freeaddrinfo(addr);
return INVALID_SOCKET;
}
iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock);
bind_to_completion_port(iocpd);
LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
connect_result_t *result = connect_result(iocpd, addr);
DWORD unused;
bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
NULL, 0, &unused, (LPOVERLAPPED) result);
if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
pn_free(result);
iocpd->write_closed = true;
iocpd->read_closed = true;
if (iocp->iocp_trace)
iocp_log("%s\n", pn_error_text(error));
} else {
iocpd->ops_in_progress++;
}
return sock;
}
void complete_connect(connect_result_t *result, HRESULT status)
{
iocpdesc_t *iocpd = result->base.iocpd;
if (iocpd->closing) {
pn_free(result);
reap_check(iocpd);
return;
}
if (status) {
iocpdesc_fail(iocpd, status, "Connect failure");
// Posix sets selectable events as follows:
pni_events_update(iocpd, PN_READABLE | PN_EXPIRED);
} else {
release_sys_sendbuf(iocpd->socket);
if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)");
} else {
pni_events_update(iocpd, PN_WRITABLE);
start_reading(iocpd);
}
}
pn_free(result);
return;
}
// === Async writes
static bool write_in_progress(iocpdesc_t *iocpd)
{
return pni_write_pipeline_size(iocpd->pipeline) != 0;
}
write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
{
write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
if (result) {
result->base.type = IOCP_WRITE;
result->base.iocpd = iocpd;
result->buffer.start = buf;
result->buffer.size = buflen;
}
return result;
}
static int submit_write(write_result_t *result, const void *buf, size_t len)
{
WSABUF wsabuf;
wsabuf.buf = (char *) buf;
wsabuf.len = len;
memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
(LPOVERLAPPED) result, 0);
}
ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
{
if (len == 0) return 0;
*would_block = false;
if (is_listener(iocpd)) {
set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
return INVALID_SOCKET;
}
if (iocpd->closing) {
set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
return SOCKET_ERROR;
}
if (iocpd->write_closed) {
assert(pn_error_code(iocpd->error));
pn_error_copy(error, iocpd->error);
if (iocpd->iocp->iocp_trace)
iocp_log("write error: %s\n", pn_error_text(error));
return SOCKET_ERROR;
}
if (len == 0) return 0;
if (!(iocpd->events & PN_WRITABLE)) {
*would_block = true;
return SOCKET_ERROR;
}
size_t written = 0;
size_t requested = len;
const char *outgoing = (const char *) buf;
size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
if (!available) {
*would_block = true;
return SOCKET_ERROR;
}
for (size_t wr_count = 0; wr_count < available; wr_count++) {
write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
assert(result);
result->base.iocpd = iocpd;
ssize_t actual_len = len;
if (len > result->buffer.size) actual_len = result->buffer.size;
result->requested = actual_len;
memmove((void *)result->buffer.start, outgoing, actual_len);
outgoing += actual_len;
written += actual_len;
len -= actual_len;
int werror = submit_write(result, result->buffer.start, actual_len);
if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
pni_write_pipeline_return(iocpd->pipeline, result);
iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
return SOCKET_ERROR;
}
iocpd->ops_in_progress++;
}
if (!pni_write_pipeline_writable(iocpd->pipeline))
pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
return written;
}
/*
* Note: iocp write completion is not "bytes on the wire", it is "peer
* acked the sent bytes". Completion can be seconds on a slow
* consuming peer.
*/
void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
{
iocpdesc_t *iocpd = result->base.iocpd;
if (iocpd->closing) {
pni_write_pipeline_return(iocpd->pipeline, result);
if (!iocpd->write_closed && !write_in_progress(iocpd))
iocp_shutdown(iocpd);
reap_check(iocpd);
return;
}
if (status == 0 && xfer_count > 0) {
if (xfer_count != result->requested) {
// Is this recoverable? How to preserve order if multiple overlapped writes?
pni_write_pipeline_return(iocpd->pipeline, result);
iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
return;
} else {
// Success.
pni_write_pipeline_return(iocpd->pipeline, result);
if (pni_write_pipeline_writable(iocpd->pipeline))
pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
return;
}
}
// Other error
pni_write_pipeline_return(iocpd->pipeline, result);
if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN
|| status == ERROR_NETNAME_DELETED) {
iocpd->write_closed = true;
iocpd->poll_error = true;
pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
pni_win32_error(iocpd->error, "Remote close or timeout", status);
} else {
iocpdesc_fail(iocpd, status, "IOCP async write error");
}
}
// === Async reads
static read_result_t *read_result(iocpdesc_t *iocpd)
{
read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
if (result) {
result->base.type = IOCP_READ;
result->base.iocpd = iocpd;
}
return result;
}
static void begin_zero_byte_read(iocpdesc_t *iocpd)
{
if (iocpd->read_in_progress) return;
if (iocpd->read_closed) {
pni_events_update(iocpd, iocpd->events | PN_READABLE);
return;
}
read_result_t *result = iocpd->read_result;
memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
DWORD flags = 0;
WSABUF wsabuf;
wsabuf.buf = result->unused_buf;
wsabuf.len = 0;
int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
&result->base.overlapped, 0);
if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
return;
}
iocpd->ops_in_progress++;
iocpd->read_in_progress = true;
}
static void drain_until_closed(iocpdesc_t *iocpd) {
size_t max_drain = 16 * 1024;
char buf[512];
read_result_t *result = iocpd->read_result;
while (result->drain_count < max_drain) {
int rv = recv(iocpd->socket, buf, 512, 0);
if (rv > 0)
result->drain_count += rv;
else if (rv == 0) {
iocpd->read_closed = true;
return;
} else if (WSAGetLastError() == WSAEWOULDBLOCK) {
// wait a little longer
start_reading(iocpd);
return;
}
else
break;
}
// Graceful close indication unlikely, force the issue
if (iocpd->iocp->iocp_trace)
if (result->drain_count >= max_drain)
iocp_log("graceful close on reader abandoned (too many chars)\n");
else
iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
iocpd->read_closed = true;
}
void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
{
iocpdesc_t *iocpd = result->base.iocpd;
iocpd->read_in_progress = false;
if (iocpd->closing) {
// Application no longer reading, but we are looking for a zero length read
if (!iocpd->read_closed)
drain_until_closed(iocpd);
reap_check(iocpd);
return;
}
if (status == 0 && xfer_count == 0) {
// Success.
pni_events_update(iocpd, iocpd->events | PN_READABLE);
} else {
iocpdesc_fail(iocpd, status, "IOCP read complete error");
}
}
ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
{
if (size == 0) return 0;
*would_block = false;
if (is_listener(iocpd)) {
set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
return SOCKET_ERROR;
}
if (iocpd->closing) {
// Previous call to pn_close()
set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
return SOCKET_ERROR;
}
if (iocpd->read_closed) {
if (pn_error_code(iocpd->error))
pn_error_copy(error, iocpd->error);
else
set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
return SOCKET_ERROR;
}
int count = recv(iocpd->socket, (char *) buf, size, 0);
if (count > 0) {
pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
// caller must initiate begin_zero_byte_read(iocpd);
return (ssize_t) count;
} else if (count == 0) {
iocpd->read_closed = true;
return 0;
}
if (WSAGetLastError() == WSAEWOULDBLOCK)
*would_block = true;
else {
set_iocp_error_status(error, PN_ERR, WSAGetLastError());
iocpd->read_closed = true;
}
return SOCKET_ERROR;
}
void start_reading(iocpdesc_t *iocpd)
{
begin_zero_byte_read(iocpd);
}
// === The iocp descriptor
static void pni_iocpdesc_initialize(void *object)
{
iocpdesc_t *iocpd = (iocpdesc_t *) object;
memset(iocpd, 0, sizeof(iocpdesc_t));
iocpd->socket = INVALID_SOCKET;
}
static void pni_iocpdesc_finalize(void *object)
{
iocpdesc_t *iocpd = (iocpdesc_t *) object;
pn_free(iocpd->acceptor);
pn_error_free(iocpd->error);
if (iocpd->pipeline)
if (write_in_progress(iocpd))
iocp_log("iocp descriptor write leak\n");
else
pn_free(iocpd->pipeline);
if (iocpd->read_in_progress)
iocp_log("iocp descriptor read leak\n");
else
free(iocpd->read_result);
}
static uintptr_t pni_iocpdesc_hashcode(void *object)
{
iocpdesc_t *iocpd = (iocpdesc_t *) object;
return iocpd->socket;
}
#define pni_iocpdesc_compare NULL
#define pni_iocpdesc_inspect NULL
// Reference counted in the iocpdesc map, zombie_list, selector.
static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
{
static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
assert(iocpd);
iocpd->socket = s;
return iocpd;
}
static bool is_listener_socket(pn_socket_t s)
{
BOOL tval = false;
int tvalsz = sizeof(tval);
int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
return code == 0 && tval;
}
iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s) {
assert (s != INVALID_SOCKET);
bool listening = is_listener_socket(s);
iocpdesc_t *iocpd = pni_iocpdesc(s);
iocpd->iocp = iocp;
if (iocpd) {
iocpd->error = pn_error();
if (listening) {
iocpd->acceptor = pni_acceptor(iocpd);
} else {
iocpd->pipeline = pni_write_pipeline(iocpd);
iocpd->read_result = read_result(iocpd);
}
}
return iocpd;
}
static void bind_to_completion_port(iocpdesc_t *iocpd)
{
if (iocpd->bound) return;
if (!iocpd->iocp->completion_port) {
iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
return;
}
if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, proactor_io, 0))
iocpd->bound = true;
else {
iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
}
}
static void release_sys_sendbuf(SOCKET s)
{
// Set the socket's send buffer size to zero.
int sz = 0;
int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
assert(status == 0);
}
void pni_iocpdesc_start(iocpdesc_t *iocpd)
{
if (iocpd->bound) return;
bind_to_completion_port(iocpd);
if (is_listener(iocpd)) {
begin_accept(iocpd->acceptor, NULL);
}
else {
release_sys_sendbuf(iocpd->socket);
pni_events_update(iocpd, PN_WRITABLE);
start_reading(iocpd);
}
}
static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
result->iocpd->ops_in_progress--;
DWORD status = success ? 0 : GetLastError();
switch (result->type) {
case IOCP_ACCEPT:
complete_accept((accept_result_t *) result, status);
break;
case IOCP_CONNECT:
complete_connect((connect_result_t *) result, status);
break;
case IOCP_WRITE:
complete_write((write_result_t *) result, num_transferred, status);
break;
case IOCP_READ:
complete_read((read_result_t *) result, num_transferred, status);
break;
default:
assert(false);
}
}
void pni_iocp_drain_completions(iocp_t *iocp)
{
while (true) {
DWORD timeout_ms = 0;
DWORD num_transferred = 0;
ULONG_PTR completion_key = 0;
OVERLAPPED *overlapped = 0;
bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
&completion_key, &overlapped, timeout_ms);
if (!overlapped)
return; // timed out
iocp_result_t *result = (iocp_result_t *) overlapped;
if (!completion_key)
complete(result, good_op, num_transferred);
}
}
// returns: -1 on error, 0 on timeout, 1 successful completion
// proactor layer uses completion_key, but we don't, so ignore those (shutdown in progress)
int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
DWORD num_transferred = 0;
ULONG_PTR completion_key = 0;
OVERLAPPED *overlapped = 0;
bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
&completion_key, &overlapped, win_timeout);
if (!overlapped)
if (GetLastError() == WAIT_TIMEOUT)
return 0;
else {
if (error)
pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
return -1;
}
if (completion_key)
return pni_iocp_wait_one(iocp, timeout, error);
iocp_result_t *result = (iocp_result_t *) overlapped;
complete(result, good_op, num_transferred);
return 1;
}
// === Close (graceful and otherwise)
// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
// and fully closed.
static void zombie_list_add(iocpdesc_t *iocpd)
{
assert(iocpd->closing);
if (!iocpd->ops_in_progress) {
// No need to make a zombie.
if (iocpd->socket != INVALID_SOCKET) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
iocpd->read_closed = true;
}
return;
}
// Allow 2 seconds for graceful shutdown before releasing socket resource.
iocpd->reap_time = pn_proactor_now_64() + 2000;
pn_list_add(iocpd->iocp->zombie_list, iocpd);
}
static void reap_check(iocpdesc_t *iocpd)
{
if (iocpd->closing && !iocpd->ops_in_progress) {
if (iocpd->socket != INVALID_SOCKET) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
}
pn_list_remove(iocpd->iocp->zombie_list, iocpd);
// iocpd is decref'ed and possibly released
}
}
void pni_iocp_reap_check(iocpdesc_t *iocpd) {
reap_check(iocpd);
}
pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
{
if (pn_list_size(iocp->zombie_list)) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
return iocpd->reap_time;
}
return 0;
}
void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
{
pn_list_t *zl = iocp->zombie_list;
// Look for stale zombies that should have been reaped by "now"
for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
if (iocpd->reap_time > now)
return;
if (iocpd->socket == INVALID_SOCKET)
continue;
if (iocp->iocp_trace)
iocp_log("async close: graceful close timeout exceeded\n");
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
iocpd->read_closed = true;
// outstanding ops should complete immediately now
}
}
static void drain_zombie_completions(iocp_t *iocp)
{
// No more pn_selector_select() from App, but zombies still need care and feeding
// until their outstanding async actions complete.
pni_iocp_drain_completions(iocp);
// Discard any that have no pending async IO
size_t sz = pn_list_size(iocp->zombie_list);
for (size_t idx = 0; idx < sz;) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
if (!iocpd->ops_in_progress) {
pn_list_del(iocp->zombie_list, idx, 1);
sz--;
} else {
idx++;
}
}
unsigned shutdown_grace = 2000;
char *override = getenv("PN_SHUTDOWN_GRACE");
if (override) {
int grace = atoi(override);
if (grace > 0 && grace < 60000)
shutdown_grace = (unsigned) grace;
}
pn_timestamp_t now = pn_proactor_now_64();
pn_timestamp_t deadline = now + shutdown_grace;
while (pn_list_size(iocp->zombie_list)) {
if (now >= deadline)
break;
int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
if (rv < 0) {
iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
break;
}
now = pn_proactor_now_64();
}
if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
// Should only happen if really slow TCP handshakes, i.e. total network failure
iocp_log("network failure on Proton shutdown\n");
}
static void zombie_list_hard_close_all(iocp_t *iocp)
{
pni_iocp_drain_completions(iocp);
size_t zs = pn_list_size(iocp->zombie_list);
for (size_t i = 0; i < zs; i++) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
if (iocpd->socket != INVALID_SOCKET) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
iocpd->read_closed = true;
iocpd->write_closed = true;
}
}
pni_iocp_drain_completions(iocp);
// Zombies should be all gone. Do a sanity check.
zs = pn_list_size(iocp->zombie_list);
int remaining = 0;
int ops = 0;
for (size_t i = 0; i < zs; i++) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
remaining++;
ops += iocpd->ops_in_progress;
}
if (remaining)
iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
}
static void iocp_shutdown(iocpdesc_t *iocpd)
{
if (iocpd->socket == INVALID_SOCKET)
return; // Hard close in progress
if (shutdown(iocpd->socket, SD_SEND)) {
int err = WSAGetLastError();
if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN)
if (iocpd->iocp->iocp_trace)
iocp_log("socket shutdown failed %d\n", err);
}
iocpd->write_closed = true;
}
void pni_iocp_begin_close(iocpdesc_t *iocpd)
{
assert (!iocpd->closing);
if (is_listener(iocpd)) {
// Listening socket is easy. Close the socket which will cancel async ops.
pn_socket_t old_sock = iocpd->socket;
iocpd->socket = INVALID_SOCKET;
iocpd->closing = true;
iocpd->read_closed = true;
iocpd->write_closed = true;
closesocket(old_sock);
// Pending accepts will now complete. Zombie can die when all consumed.
zombie_list_add(iocpd);
} else {
// Continue async operation looking for graceful close confirmation or timeout.
pn_socket_t old_sock = iocpd->socket;
iocpd->closing = true;
if (!iocpd->write_closed && !write_in_progress(iocpd))
iocp_shutdown(iocpd);
zombie_list_add(iocpd);
}
}
// === iocp_t
#define pni_iocp_hashcode NULL
#define pni_iocp_compare NULL
#define pni_iocp_inspect NULL
void pni_iocp_initialize(void *obj)
{
iocp_t *iocp = (iocp_t *) obj;
memset(iocp, 0, sizeof(iocp_t));
pni_shared_pool_create(iocp);
iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
assert(iocp->completion_port != NULL);
iocp->zombie_list = pn_list(PN_OBJECT, 0);
iocp->iocp_trace = false;
}
void pni_iocp_finalize(void *obj)
{
iocp_t *iocp = (iocp_t *) obj;
// Move sockets to closed state
drain_zombie_completions(iocp); // Last chance for graceful close
zombie_list_hard_close_all(iocp);
CloseHandle(iocp->completion_port); // This cancels all our async ops
iocp->completion_port = NULL;
// Now safe to free everything that might be touched by a former async operation.
pn_free(iocp->zombie_list);
pni_shared_pool_free(iocp);
}
iocp_t *pni_iocp()
{
static const pn_cid_t CID_pni_iocp = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(pni_iocp);
iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t));
return iocp;
}
}
// ======================================================================
// Proton Proactor support
// ======================================================================
#include "core/logger_private.h"
#include "proactor-internal.h"
class csguard {
public:
csguard(CRITICAL_SECTION *cs) : cs_(cs), set_(true) { EnterCriticalSection(cs_); }
~csguard() { if (set_) LeaveCriticalSection(cs_); }
void release() {
if (set_) {
set_ = false;
LeaveCriticalSection(cs_);
}
}
private:
LPCRITICAL_SECTION cs_;
bool set_;
};
// Get string from error status
std::string errno_str2(DWORD status) {
char buf[512];
if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
0, status, 0, buf, sizeof(buf), 0))
return std::string(buf);
return std::string("internal proactor error");
}
std::string errno_str(const std::string& msg, bool is_wsa) {
DWORD e = is_wsa ? WSAGetLastError() : GetLastError();
return msg + ": " + errno_str2(e);
}
using namespace pn_experimental;
static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
{
struct addrinfo hints = { 0 };
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
return getaddrinfo(host, port, &hints, res) ? WSAGetLastError() : 0;
}
const char *COND_NAME = "proactor";
PN_HANDLE(PN_PROACTOR)
// The number of times a connection event batch may be replenished for
// a thread between calls to wait().
// TODO: consider some instrumentation to determine an optimal number
// or switch to cpu time based limit.
#define HOG_MAX 3
/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
Class definitions are for identification as pn_event_t context only.
*/
PN_STRUCT_CLASSDEF(pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener)
/* Completion serialization context common to connection and listener. */
/* And also the reaper singleton (which has no socket */
typedef enum {
PROACTOR,
PCONNECTION,
LISTENER,
WAKEABLE } pcontext_type_t;
typedef struct pcontext_t {
CRITICAL_SECTION cslock;
pn_proactor_t *proactor; /* Immutable */
void *owner; /* Instance governed by the context */
pcontext_type_t type;
bool working;
bool wake_pending;
int completion_ops; // uncompleted ops that are not socket IO related
struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
bool closing;
// Next 4 are protected by the proactor mutex
struct pcontext_t* next; /* Protected by proactor.mutex */
struct pcontext_t* prev; /* Protected by proactor.mutex */
int disconnect_ops; /* ops remaining before disconnect complete */
bool disconnecting; /* pn_proactor_disconnect */
} pcontext_t;
static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
memset(ctx, 0, sizeof(*ctx));
InitializeCriticalSectionAndSpinCount(&ctx->cslock, 4000);
ctx->proactor = p;
ctx->owner = o;
ctx->type = t;
}
static void pcontext_finalize(pcontext_t* ctx) {
DeleteCriticalSection(&ctx->cslock);
}
typedef struct psocket_t {
iocpdesc_t *iocpd; /* NULL if reaper, or socket open failure. */
pn_listener_t *listener; /* NULL for a connection socket */
pn_netaddr_t listen_addr; /* Not filled in for connection sockets */
char addr_buf[PN_MAX_ADDR];
const char *host, *port;
bool is_reaper;
} psocket_t;
static void psocket_init(psocket_t* ps, pn_listener_t *listener, bool is_reaper, const char *addr) {
ps->is_reaper = is_reaper;
if (is_reaper) return;
ps->listener = listener;
pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
}
struct pn_proactor_t {
pcontext_t context;
CRITICAL_SECTION write_lock;
CRITICAL_SECTION timer_lock;
CRITICAL_SECTION bind_lock;
HANDLE timer_queue;
HANDLE timeout_timer;
iocp_t *iocp;
class reaper *reaper;
pn_collector_t *collector;
pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
pn_event_batch_t batch;
size_t disconnects_pending; /* unfinished proactor disconnects*/
// need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
bool need_interrupt;
bool need_inactive;
bool need_timeout;
bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
bool timeout_processed; /* timout event dispatched in the most recent event batch */
bool delayed_interrupt;
bool shutting_down;
};
typedef struct pconnection_t {
psocket_t psocket;
pcontext_t context;
pn_connection_driver_t driver;
std::queue<iocp_result_t *> *completion_queue;
std::queue<iocp_result_t *> *work_queue;
pn_condition_t *disconnect_condition;
pn_event_batch_t batch;
int wake_count;
int hog_count; // thread hogging limiter
bool server; /* accept, not connect */
bool started;
bool connecting;
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
bool bound;
bool stop_timer_required;
bool can_wake;
HANDLE tick_timer;
struct pn_netaddr_t local, remote; /* Actual addresses */
struct addrinfo *addrinfo; /* Resolved address list */
struct addrinfo *ai; /* Current connect address */
} pconnection_t;
struct pn_listener_t {
psocket_t *psockets; /* Array of listening sockets */
size_t psockets_size;
pcontext_t context;
std::queue<accept_result_t *> *pending_accepts; // sockets awaiting a pn_listener_accept
int pending_events; // number of PN_LISTENER_ACCEPT events to be delivered
pn_condition_t *condition;
pn_collector_t *collector;
pn_event_batch_t batch;
pn_record_t *attachments;
void *listener_context;
size_t backlog;
bool close_dispatched;
};
static bool proactor_remove(pcontext_t *ctx);
static void listener_done(pn_listener_t *l);
static bool proactor_update_batch(pn_proactor_t *p);
static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *result);
static void recycle_result(accept_result_t *accept_result);
static void proactor_add(pcontext_t *p);
static void release_pending_accepts(pn_listener_t *l);
static void proactor_shutdown(pn_proactor_t *p);
static void post_completion(iocp_t *iocp, ULONG_PTR arg1, void *arg2) {
// Despite the vagueness of the official documentation, the
// following args to Post are passed undisturbed and unvalidated
// to GetQueuedCompletionStatus(). In particular, arg2 does not
// have to be an OVERLAPPED struct and may be NULL.
DWORD nxfer = 0; // We could pass this through too if needed.
PostQueuedCompletionStatus(iocp->completion_port, nxfer, arg1, (LPOVERLAPPED) arg2);
}
static inline bool is_write_result(iocp_result_t *r) {
return r && r->type == IOCP_WRITE;
}
static inline bool is_read_result(iocp_result_t *r) {
return r && r->type == IOCP_READ;
}
static inline bool is_connect_result(iocp_result_t *r) {
return r && r->type == IOCP_CONNECT;
}
// From old io.c
static void pni_configure_sock_2(pn_socket_t sock) {
//
// Disable the Nagle algorithm on TCP connections.
//
int flag = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
//TODO: log/err
}
u_long nonblock = 1;
if (ioctlsocket(sock, FIONBIO, &nonblock)) {
// TODO: log/err
}
}
static LPFN_CONNECTEX lookup_connect_ex2(SOCKET s)
{
GUID guid = WSAID_CONNECTEX;
DWORD bytes = 0;
LPFN_CONNECTEX fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
// File descriptor wrapper that calls ::close in destructor.
class unique_socket {
public:
unique_socket(pn_socket_t socket) : socket_(socket) {}
~unique_socket() { if (socket_ != INVALID_SOCKET) ::closesocket(socket_); }
operator pn_socket_t() const { return socket_; }
pn_socket_t release() { pn_socket_t ret = socket_; socket_ = INVALID_SOCKET; return ret; }
protected:
pn_socket_t socket_;
};
void do_complete(iocp_result_t *result) {
iocpdesc_t *iocpd = result->iocpd; // connect result gets deleted
switch (result->type) {
case IOCP_ACCEPT:
/* accept is now processed inline to do in parallel, except on teardown */
assert(iocpd->closing);
complete_accept((accept_result_t *) result, result->status); // free's result and retires new_sock
break;
case IOCP_CONNECT:
complete_connect((connect_result_t *) result, result->status);
break;
case IOCP_WRITE:
complete_write((write_result_t *) result, result->num_transferred, result->status);
break;
case IOCP_READ:
complete_read((read_result_t *) result, result->num_transferred, result->status);
break;
default:
assert(false);
}
iocpd->ops_in_progress--; // Set in each begin_xxx call
}
static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
return !(ps->is_reaper || ps->listener) ? (pconnection_t*)ps : NULL;
}
static inline pn_listener_t *as_listener(psocket_t* ps) {
return ps->listener;
}
static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
return c->type == PCONNECTION ?
containerof(c, pconnection_t, context) : NULL;
}
static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
return c->type == LISTENER ?
containerof(c, pn_listener_t, context) : NULL;
}
static pcontext_t *psocket_context(psocket_t *ps) {
if (ps->listener)
return &ps->listener->context;
pconnection_t *pc = as_pconnection_t(ps);
return &pc->context;
}
// Call wih lock held
static inline void wake_complete(pcontext_t *ctx) {
ctx->wake_pending = false;
ctx->completion_ops--;
}
// Call wih lock held
static void wakeup(psocket_t *ps) {
pcontext_t *ctx = psocket_context(ps);
if (!ctx->working && !ctx->wake_pending) {
ctx->wake_pending = true;
ctx->completion_ops++;
post_completion(ctx->proactor->iocp, psocket_wakeup_key, ps);
}
}
// Call wih lock held
static inline void proactor_wake_complete(pn_proactor_t *p) {
wake_complete(&p->context);
}
// Call wih lock held
static void proactor_wake(pn_proactor_t *p) {
if (!p->context.working && !p->context.wake_pending) {
p->context.wake_pending = true;
p->context.completion_ops++;
post_completion(p->iocp, proactor_wake_key, p);
}
}
VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ );
// Serialize handling listener and connection closing IO completions
// after the engine has lost interest. A temporary convenience while
// using the old single threaded iocp/io/select driver code.
class reaper {
public:
reaper(pn_proactor_t *p, CRITICAL_SECTION *wlock, iocp_t *iocp)
: iocp_(iocp), global_wlock_(wlock), timer_(NULL), running(true) {
InitializeCriticalSectionAndSpinCount(&lock_, 4000);
timer_queue_ = CreateTimerQueue();
if (!timer_queue_) {
perror("CreateTimerQueue");
abort();
}
}
// Connection or listener lock must also be held by caller.
bool add(iocpdesc_t *iocpd) {
if (!iocpd) return false;
csguard g(&lock_);
if (iocpd->closing) return false;
bool rval = !iocpd->ops_in_progress;
pni_iocp_begin_close(iocpd); // sets iocpd->closing
pn_decref(iocpd); // may still be ref counted on zombie list
reap_timer();
return rval;
}
// For cases where the close will be immediate. I.E. after a failed
// connection attempt where there is no follow-on IO.
void fast_reap(iocpdesc_t *iocpd) {
assert(iocpd && iocpd->ops_in_progress == 0 && !iocpd->closing);
csguard g(&lock_);
pni_iocp_begin_close(iocpd);
pn_decref(iocpd);
}
bool process(iocp_result_t *result) {
// No queue of completions for the reaper. Just process
// serialized by the lock assuming all actions are "short".
// That may be wrong, and if so the real fix is not a
// consumer/producer setup but just replace the reaper with a
// multi threaded alternative.
csguard g(&lock_);
iocpdesc_t *iocpd = result->iocpd;
if (is_write_result(result)) {
csguard wg(global_wlock_);
do_complete(result);
}
else do_complete(result);
// result may now be NULL
bool rval = (iocpd->ops_in_progress == 0);
pni_iocp_reap_check(iocpd);
return rval;
}
// Called when all competing threads have terminated except our own reap_check timer.
void final_shutdown() {
running = false;
DeleteTimerQueueEx(timer_queue_, INVALID_HANDLE_VALUE);
// No pending or active timers from thread pool remain. Truly single threaded now.
pn_free((void *) iocp_); // calls pni_iocp_finalize(); cleans up all sockets, completions, completion port.
DeleteCriticalSection(&lock_);
}
void reap_check() {
csguard g(&lock_);
DeleteTimerQueueTimer(timer_queue_, timer_, NULL);
timer_ = NULL;
reap_timer();
}
private:
void reap_timer() {
// Call with lock
if (timer_ || !running)
return;
int64_t now = pn_proactor_now_64();
pni_zombie_check(iocp_, now);
int64_t zd = pni_zombie_deadline(iocp_);
if (zd) {
DWORD tm = (zd > now) ? zd - now : 1;
if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
0, WT_EXECUTEONLYONCE)) {
perror("CreateTimerQueueTimer");
abort();
}
}
}
iocp_t *iocp_;
CRITICAL_SECTION lock_;
CRITICAL_SECTION *global_wlock_;
HANDLE timer_queue_;
HANDLE timer_;
bool running;
};
VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
// queue timer callback
reaper *r = static_cast<reaper *>(arg);
r->reap_check();
}
static void listener_begin_close(pn_listener_t* l);
static void connect_step_done(pconnection_t *pc, connect_result_t *result);
static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
return (batch->next_event == proactor_batch_next) ?
containerof(batch, pn_proactor_t, batch) : NULL;
}
static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
return (batch->next_event == listener_batch_next) ?
containerof(batch, pn_listener_t, batch) : NULL;
}
static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
return (batch->next_event == pconnection_batch_next) ?
containerof(batch, pconnection_t, batch) : NULL;
}
static inline bool pconnection_has_event(pconnection_t *pc) {
return pn_connection_driver_has_event(&pc->driver);
}
static inline bool listener_has_event(pn_listener_t *l) {
return pn_collector_peek(l->collector);
}
static inline bool proactor_has_event(pn_proactor_t *p) {
return pn_collector_peek(p->collector);
}
static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
if (ps->is_reaper)
return;
if (!ps->listener) {
pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
pn_connection_driver_close(driver);
} else {
pn_listener_t *l = as_listener(ps);
pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
listener_begin_close(l);
}
}
static void psocket_error(psocket_t *ps, int err, const char* what) {
psocket_error_str(ps, errno_str2(err).c_str(), what);
}
// ========================================================================
// pconnection
// ========================================================================
/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
#define CID_pconnection CID_pn_object
#define pconnection_inspect NULL
#define pconnection_initialize NULL
#define pconnection_hashcode NULL
#define pconnection_compare NULL
static void pconnection_finalize(void *vp_pconnection) {
pconnection_t *pc = (pconnection_t*)vp_pconnection;
pcontext_finalize(&pc->context);
}
static const pn_class_t pconnection_class = PN_CLASS(pconnection);
static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
{
if (pn_connection_driver_init(&pc->driver, c, t) != 0) {
free(pc);
return "pn_connection_driver_init failure";
}
{
csguard g(&p->bind_lock);
pn_record_t *r = pn_connection_attachments(pc->driver.connection);
if (pn_record_get(r, PN_PROACTOR)) {
pn_connection_driver_destroy(&pc->driver);
free(pc);
return "pn_connection_t already in use";
}
pn_record_def(r, PN_PROACTOR, &pconnection_class);
pn_record_set(r, PN_PROACTOR, pc);
pc->bound = true;
pc->can_wake = true;
}
pc->completion_queue = new std::queue<iocp_result_t *>();
pc->work_queue = new std::queue<iocp_result_t *>();
pcontext_init(&pc->context, PCONNECTION, p, pc);
psocket_init(&pc->psocket, NULL, false, addr);
pc->batch.next_event = pconnection_batch_next;
if (server) {
pn_transport_set_server(pc->driver.transport);
}
pn_decref(pc); /* Will be deleted when the connection is */
return NULL;
}
// Either stops a timer before firing or returns after the callback has
// completed (in the threadpool thread). Never "in doubt".
static bool stop_timer(HANDLE tqueue, HANDLE *timer) {
if (!*timer) return true;
if (DeleteTimerQueueTimer(tqueue, *timer, INVALID_HANDLE_VALUE)) {
*timer = NULL;
return true;
}
return false; // error
}
static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, void *cb_arg, DWORD time) {
if (*timer) {
// TODO: log err
return false;
}
return CreateTimerQueueTimer(timer, tqueue, cb, cb_arg, time, 0, WT_EXECUTEONLYONCE);
}
VOID CALLBACK tick_timer_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
pconnection_t *pc = (pconnection_t *) arg;
csguard g(&pc->context.cslock);
if (pc->psocket.iocpd && !pc->psocket.iocpd->closing) {
pc->tick_pending = true;
wakeup(&pc->psocket);
}
}
// Call with no lock held or stop_timer and callback may deadlock
static void pconnection_tick(pconnection_t *pc) {
pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
if(!stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer)) {
// TODO: handle error
}
uint64_t now = pn_proactor_now_64();
uint64_t next = pn_transport_tick(t, now);
if (next) {
if (!start_timer(pc->context.proactor->timer_queue, &pc->tick_timer, tick_timer_cb, pc, next - now)) {
// TODO: handle error
}
}
}
}
static pconnection_t *get_pconnection(pn_connection_t* c) {
if (!c) return NULL;
pn_record_t *r = pn_connection_attachments(c);
return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
pn_listener_t *pn_event_listener(pn_event_t *e) {
return (pn_event_class(e) == PN_CLASSCLASS(pn_listener)) ? (pn_listener_t*)pn_event_context(e) : NULL;
}
pn_proactor_t *pn_event_proactor(pn_event_t *e) {
if (pn_event_class(e) == PN_CLASSCLASS(pn_proactor)) return (pn_proactor_t*)pn_event_context(e);
pn_listener_t *l = pn_event_listener(e);
if (l) return l->context.proactor;
pn_connection_t *c = pn_event_connection(e);
if (c) return pn_connection_proactor(pn_event_connection(e));
return NULL;
}
// Call after successful accept
static void set_sock_names(pconnection_t *pc) {
// This works. Note possible use of GetAcceptExSockaddrs()
pn_socket_t sock = pc->psocket.iocpd->socket;
socklen_t len = sizeof(pc->local.ss);
getsockname(sock, (struct sockaddr*)&pc->local.ss, &len);
len = sizeof(pc->remote.ss);
getpeername(sock, (struct sockaddr*)&pc->remote.ss, &len);
}
// Call with lock held when closing and transitioning away from working context
static inline bool pconnection_can_free(pconnection_t *pc) {
return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0
&& !pc->stop_timer_required && !pconnection_has_event(pc) && !pc->queued_disconnect;
}
static void pconnection_final_free(pconnection_t *pc) {
if (pc->addrinfo) {
freeaddrinfo(pc->addrinfo);
}
pn_condition_free(pc->disconnect_condition);
pn_incref(pc); /* Make sure we don't do a circular free */
pn_connection_driver_destroy(&pc->driver);
pn_decref(pc);
/* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
}
// Call with lock held or from forced shutdown
static void pconnection_begin_close(pconnection_t *pc) {
if (!pc->context.closing) {
pc->context.closing = true;
pn_connection_driver_close(&pc->driver);
pc->stop_timer_required = true;
if (pc->context.proactor->reaper->add(pc->psocket.iocpd))
pc->psocket.iocpd = NULL;
wakeup(&pc->psocket);
}
}
// call with lock held. return true if caller must call pconnection_final_free()
static bool pconnection_cleanup(pconnection_t *pc) {
delete pc->completion_queue;
delete pc->work_queue;
return proactor_remove(&pc->context);
}
static inline bool pconnection_work_pending(pconnection_t *pc) {
if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
return true;
if (!pc->started)
return false;
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
return (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE));
}
// Return true unless reaped
static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
ssize_t n;
bool wouldblock;
{
csguard g(&pc->context.proactor->write_lock);
n = pni_iocp_begin_write(pc->psocket.iocpd, wbuf.start, wbuf.size, &wouldblock, pc->psocket.iocpd->error);
}
if (n > 0) {
pn_connection_driver_write_done(&pc->driver, n);
} else if (n < 0 && !wouldblock) {
psocket_error(&pc->psocket, WSAGetLastError(), "on write to");
}
else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
if (pc->context.proactor->reaper->add(pc->psocket.iocpd)) {
pc->psocket.iocpd = NULL;
return false;
}
}
return true;
}
// Queue the result (if any) for the one worker thread. Become the worker if possible.
// NULL result is a wakeup or a topup.
// topup signifies that the working thread is the caller looking for additional events.
static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *result, bool topup) {
bool first = true;
bool waking = false;
bool tick_required = false;
bool open = false;
while (true) {
{
csguard g(&pc->context.cslock);
if (first) {
first = false;
if (result)
pc->completion_queue->push(result);
else if (!topup)
wake_complete(&pc->context);
if (!topup) {
if (pc->context.working)
return NULL;
pc->context.working = true;
}
open = pc->started && !pc->connecting && !pc->context.closing;
}
else {
// Just re-acquired lock after processing IO and engine work
if (pconnection_has_event(pc))
return &pc->batch;
if (!pconnection_work_pending(pc)) {
pc->context.working = false;
if (pn_connection_driver_finished(&pc->driver)) {
pconnection_begin_close(pc);
}
if (pc->context.closing && pconnection_can_free(pc)) {
if (pconnection_cleanup(pc)) {
g.release();
pconnection_final_free(pc);
return NULL;
} // else disconnect logic has the free obligation
}
return NULL;
}
}
if (pc->queued_disconnect) { // From pn_proactor_disconnect()
pc->queued_disconnect = false;
if (!pc->context.closing) {
if (pc->disconnect_condition) {
pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
}
pn_connection_driver_close(&pc->driver);
}
}
assert(pc->work_queue->empty());
if (pc->completion_queue->size())
std::swap(pc->work_queue, pc->completion_queue);
if (pc->wake_count) {
waking = open && pc->can_wake && !pn_connection_driver_finished(&pc->driver);
pc->wake_count = 0;
}
if (pc->tick_pending) {
pc->tick_pending = false;
if (open)
tick_required = true;
}
}
// No lock
// Drain completions prior to engine work
while (pc->work_queue->size()) {
result = (iocp_result_t *) pc->work_queue->front();
pc->work_queue->pop();
if (result->iocpd->closing) {
if (pc->context.proactor->reaper->process(result)) {
pc->psocket.iocpd = NULL; // reaped
open = false;
}
}
else {
if (is_write_result(result)) {
csguard g(&pc->context.proactor->write_lock);
do_complete(result);
}
else if (is_connect_result(result)) {
connect_step_done(pc, (connect_result_t *) result);
if (pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE)) {
pc->connecting = false;
if (pc->started)
open = true;
}
}
else do_complete(result);
}
}
if (!open) {
if (pc->stop_timer_required) {
pc->stop_timer_required = false;
// Do without context lock to avoid possible deadlock
stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer);
}
} else {
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
/* Ticks and checking buffers can generate events, process before proceeding */
bool ready = pconnection_has_event(pc);
if (waking) {
pn_connection_t *c = pc->driver.connection;
pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
waking = false;
}
if (ready) {
continue;
}
if (wbuf.size >= 16384 && (pc->psocket.iocpd->events & PN_WRITABLE)) {
if (!pconnection_write(pc, wbuf))
continue;
}
if (rbuf.size > 0 && !pc->psocket.iocpd->read_in_progress) {
bool wouldblock;
ssize_t n = pni_iocp_recv(pc->psocket.iocpd, rbuf.start, rbuf.size, &wouldblock, pc->psocket.iocpd->error);
if (n > 0) {
pn_connection_driver_read_done(&pc->driver, n);
pconnection_tick(pc); /* check for tick changes. */
tick_required = false;
}
else if (n == 0)
pn_connection_driver_read_close(&pc->driver);
else if (!wouldblock)
psocket_error(&pc->psocket, WSAGetLastError(), "on read from");
}
if (!pc->psocket.iocpd->read_in_progress)
start_reading(pc->psocket.iocpd);
if (tick_required) {
pconnection_tick(pc); /* check for tick changes. */
tick_required = false;
}
wbuf = pn_connection_driver_write_buffer(&pc->driver);
if (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE)) {
if (!pconnection_write(pc, wbuf))
open = false;
}
}
if (topup)
return NULL; // regardless if new events made available
}
}
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
pconnection_t *pc = batch_pconnection(batch);
pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
if (!e && ++pc->hog_count < HOG_MAX) {
pconnection_process(pc, NULL, true); // top up
e = pn_connection_driver_next_event(&pc->driver);
}
if (e && !pc->started && pn_event_type(e) == PN_CONNECTION_BOUND)
pc->started = true; // SSL will be set up on return and safe to do IO with correct transport layers
return e;
}
static void pconnection_done(pconnection_t *pc) {
{
csguard g(&pc->context.cslock);
pc->context.working = false;
pc->hog_count = 0;
if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
wakeup(&pc->psocket);
} else if (pn_connection_driver_finished(&pc->driver)) {
pconnection_begin_close(pc);
wakeup(&pc->psocket);
}
}
}
static inline bool is_inactive(pn_proactor_t *p) {
return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->need_timeout && !p->shutting_down);
}
// Call whenever transitioning from "definitely active" to "maybe inactive"
static void wake_if_inactive(pn_proactor_t *p) {
if (is_inactive(p)) {
p->need_inactive = true;
proactor_wake(p);
}
}
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
pconnection_t *pc = batch_pconnection(batch);
if (pc) {
pconnection_done(pc);
return;
}
pn_listener_t *l = batch_listener(batch);
if (l) {
listener_done(l);
return;
}
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
csguard g(&p->context.cslock);
p->context.working = false;
if (p->delayed_interrupt) {
p->delayed_interrupt = false;
p->need_interrupt = true;
}
if (p->timeout_processed) {
p->timeout_processed = false;
wake_if_inactive(p);
}
if (proactor_update_batch(p))
proactor_wake(p);
return;
}
}
static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
pn_collector_put(p->collector, PN_CLASSCLASS(pn_proactor), p, t);
}
static pn_event_batch_t *proactor_process(pn_proactor_t *p) {
csguard g(&p->context.cslock);
proactor_wake_complete(p);
if (!p->context.working) { /* Can generate proactor events */
if (proactor_update_batch(p)) {
p->context.working = true;
return &p->batch;
}
}
return NULL;
}
static pn_event_batch_t *psocket_process(psocket_t *ps, iocp_result_t *result, reaper *rpr) {
if (ps) {
pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
return pconnection_process(pc, result, false);
} else {
pn_listener_t *l = as_listener(ps);
if (l)
return listener_process(l, result);
}
}
rpr->process(result);
return NULL;
}
static pn_event_batch_t *proactor_completion_loop(struct pn_proactor_t* p, bool can_block) {
// Proact! Process inbound completions of async activity until one
// of them provides a batch of events.
while(true) {
pn_event_batch_t *batch = NULL;
DWORD win_timeout = can_block ? INFINITE : 0;
DWORD num_xfer = 0;
ULONG_PTR completion_key = 0;
OVERLAPPED *overlapped = 0;
bool good_op = GetQueuedCompletionStatus (p->iocp->completion_port, &num_xfer,
&completion_key, &overlapped, win_timeout);
if (!overlapped && !can_block && GetLastError() == WAIT_TIMEOUT)
return NULL; // valid timeout
if (!good_op && !overlapped) {
// Should never happen. shutdown?
// We aren't expecting a timeout, closed completion port, or other error here.
PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_CRITICAL, "%s", errno_str("Windows Proton proactor internal failure\n", false).c_str());
abort();
}
switch (completion_key) {
case proactor_io: {
// Normal IO case for connections and listeners
iocp_result_t *result = (iocp_result_t *) overlapped;
result->status = good_op ? 0 : GetLastError();
result->num_transferred = num_xfer;
psocket_t *ps = (psocket_t *) result->iocpd->active_completer;
batch = psocket_process(ps, result, p->reaper);
break;
}
// completion_key on our completion port is always null unless set by us
// in PostQueuedCompletionStatus. In which case, we hijack the overlapped
// data structure for our own use.
case psocket_wakeup_key:
batch = psocket_process((psocket_t *) overlapped, NULL, p->reaper);
break;
case proactor_wake_key:
batch = proactor_process((pn_proactor_t *) overlapped);
break;
case recycle_accept_key:
recycle_result((accept_result_t *) overlapped);
break;
default:
break;
}
if (batch) return batch;
// No event generated. Try again with next completion.
}
}
pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
return proactor_completion_loop(p, true);
}
pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
return proactor_completion_loop(p, false);
}
void pn_proactor_interrupt(pn_proactor_t *p) {
csguard g(&p->context.cslock);
if (p->context.working)
p->delayed_interrupt = true;
else
p->need_interrupt = true;
proactor_wake(p);
}
// runs on a threadpool thread. Must not hold timer_lock.
VOID CALLBACK timeout_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
pn_proactor_t *p = (pn_proactor_t *) arg;
csguard gtimer(&p->timer_lock);
csguard g(&p->context.cslock);
if (p->timeout_set)
p->need_timeout = true; // else cancelled
p->timeout_set = false;