blob: 404dd3639ce62148d64fb5024996989672b23605 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
#if _WIN32_WINNT < 0x0501
#error "Proton requires Windows API support for XP or later."
#endif
#include <winsock2.h>
#include <mswsock.h>
#include <Ws2tcpip.h>
#include "platform.h"
#include <proton/object.h>
#include <proton/io.h>
#include <proton/selector.h>
#include <proton/error.h>
#include <proton/transport.h>
#include "iocp.h"
#include "util.h"
#include <assert.h>
/*
* Windows IO Completion Port support for Proton.
*
* Overlapped writes are used to avoid lengthy stalls between write
* completion and starting a new write. Non-overlapped reads are used
* since Windows accumulates inbound traffic without stalling and
* managing read buffers would not avoid a memory copy at the pn_read
* boundary.
*/
// Max number of overlapped accepts per listener
#define IOCP_MAX_ACCEPTS 10
// AcceptEx squishes the local and remote addresses and optional data
// all together when accepting the connection. Reserve enough for
// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
// per address is required by AcceptEx.
#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
static void iocp_log(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fflush(stderr);
}
static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
{
char buf[512];
if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
0, status, 0, buf, sizeof(buf), 0))
pn_error_set(error, code, buf);
else {
fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
}
}
static void reap_check(iocpdesc_t *);
static void bind_to_completion_port(iocpdesc_t *iocpd);
static void iocp_shutdown(iocpdesc_t *iocpd);
static void start_reading(iocpdesc_t *iocpd);
static bool is_listener(iocpdesc_t *iocpd);
static void release_sys_sendbuf(SOCKET s);
static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
{
pni_win32_error(iocpd->error, text, status);
if (iocpd->iocp->iocp_trace) {
iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
}
if (!is_listener(iocpd) && !iocpd->write_closed && !pni_write_pipeline_size(iocpd->pipeline))
iocp_shutdown(iocpd);
iocpd->write_closed = true;
iocpd->read_closed = true;
iocpd->poll_error = true;
pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE));
}
// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
{
GUID guid = WSAID_ACCEPTEX;
DWORD bytes = 0;
LPFN_ACCEPTEX fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
{
GUID guid = WSAID_CONNECTEX;
DWORD bytes = 0;
LPFN_CONNECTEX fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
{
GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
DWORD bytes = 0;
LPFN_GETACCEPTEXSOCKADDRS fn;
WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&fn, sizeof(fn), &bytes, NULL, NULL);
assert(fn);
return fn;
}
// match accept socket to listener socket
static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
{
sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
return NULL;
SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
if (s == INVALID_SOCKET)
return NULL;
return pni_iocpdesc_create(iocpd->iocp, s, false);
}
static bool is_listener(iocpdesc_t *iocpd)
{
return iocpd && iocpd->acceptor;
}
// === Async accept processing
typedef struct {
iocp_result_t base;
iocpdesc_t *new_sock;
char address_buffer[IOCP_SOCKADDRBUFLEN];
DWORD unused;
} accept_result_t;
static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t));
if (result) {
result->base.type = IOCP_ACCEPT;
result->base.iocpd = listen_sock;
}
return result;
}
static void reset_accept_result(accept_result_t *result) {
memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
}
struct pni_acceptor_t {
int accept_queue_size;
pn_list_t *accepts;
iocpdesc_t *listen_sock;
bool signalled;
LPFN_ACCEPTEX fn_accept_ex;
LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
};
#define pni_acceptor_compare NULL
#define pni_acceptor_inspect NULL
#define pni_acceptor_hashcode NULL
static void pni_acceptor_initialize(void *object)
{
pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS);
}
static void pni_acceptor_finalize(void *object)
{
pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
size_t len = pn_list_size(acceptor->accepts);
for (size_t i = 0; i < len; i++)
free(pn_list_get(acceptor->accepts, i));
pn_free(acceptor->accepts);
}
static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
{
static const pn_cid_t CID_pni_acceptor = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(pni_acceptor);
pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t));
acceptor->listen_sock = iocpd;
acceptor->accept_queue_size = 0;
acceptor->signalled = false;
pn_socket_t sock = acceptor->listen_sock->socket;
acceptor->fn_accept_ex = lookup_accept_ex(sock);
acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
return acceptor;
}
static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
{
if (acceptor->listen_sock->closing) {
if (result) {
free(result);
acceptor->accept_queue_size--;
}
if (acceptor->accept_queue_size == 0)
acceptor->signalled = true;
return;
}
if (result) {
reset_accept_result(result);
} else {
if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS &&
pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) {
result = accept_result(acceptor->listen_sock);
acceptor->accept_queue_size++;
} else {
// an async accept is still pending or max concurrent accepts already hit
return;
}
}
result->new_sock = create_same_type_socket(acceptor->listen_sock);
if (result->new_sock) {
// Not yet connected.
result->new_sock->read_closed = true;
result->new_sock->write_closed = true;
bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
&result->unused, (LPOVERLAPPED) result);
if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
result->base.status = WSAGetLastError();
pn_list_add(acceptor->accepts, result);
pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE);
} else {
acceptor->listen_sock->ops_in_progress++;
// This socket is equally involved in the async operation.
result->new_sock->ops_in_progress++;
}
} else {
iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
}
}
static void complete_accept(accept_result_t *result, HRESULT status)
{
result->new_sock->ops_in_progress--;
iocpdesc_t *ld = result->base.iocpd;
if (ld->read_closed) {
if (!result->new_sock->closing)
pni_iocp_begin_close(result->new_sock);
free(result); // discard
reap_check(ld);
} else {
result->base.status = status;
pn_list_add(ld->acceptor->accepts, result);
pni_events_update(ld, ld->events | PN_READABLE);
}
}
pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error)
{
if (!is_listener(ld)) {
set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
return INVALID_SOCKET;
}
if (ld->read_closed) {
set_iocp_error_status(error, PN_ERR, WSAENOTSOCK);
return INVALID_SOCKET;
}
if (pn_list_size(ld->acceptor->accepts) == 0) {
if (ld->events & PN_READABLE && ld->iocp->iocp_trace)
iocp_log("listen socket readable with no available accept completions\n");
*would_block = true;
return INVALID_SOCKET;
}
accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0);
pn_list_del(ld->acceptor->accepts, 0, 1);
if (!pn_list_size(ld->acceptor->accepts))
pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts
pn_socket_t accept_sock;
if (FAILED(result->base.status)) {
accept_sock = INVALID_SOCKET;
pni_win32_error(ld->error, "accept failure", result->base.status);
if (ld->iocp->iocp_trace)
iocp_log("%s\n", pn_error_text(ld->error));
// App never sees this socket so close it here.
pni_iocp_begin_close(result->new_sock);
} else {
accept_sock = result->new_sock->socket;
// AcceptEx special setsockopt:
setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket,
sizeof (SOCKET));
if (addr && addrlen && *addrlen > 0) {
sockaddr_storage *local_addr = NULL;
sockaddr_storage *remote_addr = NULL;
int local_addrlen, remote_addrlen;
LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs;
fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
(SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr,
&remote_addrlen);
*addrlen = pn_min(*addrlen, remote_addrlen);
memmove(addr, remote_addr, *addrlen);
}
}
if (accept_sock != INVALID_SOCKET) {
// Connected.
result->new_sock->read_closed = false;
result->new_sock->write_closed = false;
}
// Done with the completion result, so reuse it
result->new_sock = NULL;
begin_accept(ld->acceptor, result);
return accept_sock;
}
// === Async connect processing
typedef struct {
iocp_result_t base;
char address_buffer[IOCP_SOCKADDRBUFLEN];
struct addrinfo *addrinfo;
} connect_result_t;
#define connect_result_initialize NULL
#define connect_result_compare NULL
#define connect_result_inspect NULL
#define connect_result_hashcode NULL
static void connect_result_finalize(void *object)
{
connect_result_t *result = (connect_result_t *) object;
// Do not release addrinfo until ConnectEx completes
if (result->addrinfo)
freeaddrinfo(result->addrinfo);
}
static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
static const pn_cid_t CID_connect_result = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(connect_result);
connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t));
if (result) {
memset(result, 0, sizeof(connect_result_t));
result->base.type = IOCP_CONNECT;
result->base.iocpd = iocpd;
result->addrinfo = addr;
}
return result;
}
pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
{
// addr lives for the duration of the async connect. Caller has passed ownership here.
// See connect_result_finalize().
// Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
sockaddr_storage sa;
memset(&sa, 0, sizeof(sa));
sa.ss_family = addr->ai_family;
if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
pni_win32_error(error, "begin async connection", WSAGetLastError());
if (iocp->iocp_trace)
iocp_log("%s\n", pn_error_text(error));
closesocket(sock);
freeaddrinfo(addr);
return INVALID_SOCKET;
}
iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false);
bind_to_completion_port(iocpd);
LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
connect_result_t *result = connect_result(iocpd, addr);
DWORD unused;
bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
NULL, 0, &unused, (LPOVERLAPPED) result);
if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
pn_free(result);
iocpd->write_closed = true;
iocpd->read_closed = true;
pni_iocp_begin_close(iocpd);
sock = INVALID_SOCKET;
if (iocp->iocp_trace)
iocp_log("%s\n", pn_error_text(error));
} else {
iocpd->ops_in_progress++;
}
return sock;
}
static void complete_connect(connect_result_t *result, HRESULT status)
{
iocpdesc_t *iocpd = result->base.iocpd;
if (iocpd->closing) {
pn_free(result);
reap_check(iocpd);
return;
}
if (FAILED(status)) {
iocpdesc_fail(iocpd, status, "Connect failure");
} else {
release_sys_sendbuf(iocpd->socket);
if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
iocpdesc_fail(iocpd, WSAGetLastError(), "Connect failure (update context)");
} else {
pni_events_update(iocpd, PN_WRITABLE);
start_reading(iocpd);
}
}
pn_free(result);
return;
}
// === Async writes
static bool write_in_progress(iocpdesc_t *iocpd)
{
return pni_write_pipeline_size(iocpd->pipeline) != 0;
}
write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
{
write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
if (result) {
result->base.type = IOCP_WRITE;
result->base.iocpd = iocpd;
result->buffer.start = buf;
result->buffer.size = buflen;
}
return result;
}
static int submit_write(write_result_t *result, const void *buf, size_t len)
{
WSABUF wsabuf;
wsabuf.buf = (char *) buf;
wsabuf.len = len;
memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
(LPOVERLAPPED) result, 0);
}
ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
{
if (len == 0) return 0;
*would_block = false;
if (is_listener(iocpd)) {
set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
return INVALID_SOCKET;
}
if (iocpd->closing) {
set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
return SOCKET_ERROR;
}
if (iocpd->write_closed) {
assert(pn_error_code(iocpd->error));
pn_error_copy(error, iocpd->error);
if (iocpd->iocp->iocp_trace)
iocp_log("write error: %s\n", pn_error_text(error));
return SOCKET_ERROR;
}
if (len == 0) return 0;
if (!(iocpd->events & PN_WRITABLE)) {
*would_block = true;
return SOCKET_ERROR;
}
size_t written = 0;
size_t requested = len;
const char *outgoing = (const char *) buf;
size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
if (!available) {
*would_block = true;
return SOCKET_ERROR;
}
for (size_t wr_count = 0; wr_count < available; wr_count++) {
write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
assert(result);
result->base.iocpd = iocpd;
ssize_t actual_len = pn_min(len, result->buffer.size);
result->requested = actual_len;
memmove((void *)result->buffer.start, outgoing, actual_len);
outgoing += actual_len;
written += actual_len;
len -= actual_len;
int werror = submit_write(result, result->buffer.start, actual_len);
if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
pni_write_pipeline_return(iocpd->pipeline, result);
iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
return SOCKET_ERROR;
}
iocpd->ops_in_progress++;
}
if (!pni_write_pipeline_writable(iocpd->pipeline))
pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
return written;
}
static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
{
iocpdesc_t *iocpd = result->base.iocpd;
if (iocpd->closing) {
pni_write_pipeline_return(iocpd->pipeline, result);
if (!iocpd->write_closed && !write_in_progress(iocpd))
iocp_shutdown(iocpd);
reap_check(iocpd);
return;
}
if (status == 0 && xfer_count > 0) {
if (xfer_count != result->requested) {
// Is this recoverable? How to preserve order if multiple overlapped writes?
pni_write_pipeline_return(iocpd->pipeline, result);
iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
return;
} else {
// Success.
pni_write_pipeline_return(iocpd->pipeline, result);
if (pni_write_pipeline_writable(iocpd->pipeline))
pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
return;
}
}
// Other error
pni_write_pipeline_return(iocpd->pipeline, result);
if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN
|| status == ERROR_NETNAME_DELETED) {
iocpd->write_closed = true;
iocpd->poll_error = true;
pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
pni_win32_error(iocpd->error, "Remote close or timeout", status);
} else {
iocpdesc_fail(iocpd, status, "IOCP async write error");
}
}
// === Async reads
struct read_result_t {
iocp_result_t base;
size_t drain_count;
char unused_buf[1];
};
static read_result_t *read_result(iocpdesc_t *iocpd)
{
read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
if (result) {
result->base.type = IOCP_READ;
result->base.iocpd = iocpd;
}
return result;
}
static void begin_zero_byte_read(iocpdesc_t *iocpd)
{
if (iocpd->read_in_progress) return;
if (iocpd->read_closed) {
pni_events_update(iocpd, iocpd->events | PN_READABLE);
return;
}
read_result_t *result = iocpd->read_result;
memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
DWORD flags = 0;
WSABUF wsabuf;
wsabuf.buf = result->unused_buf;
wsabuf.len = 0;
int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
&result->base.overlapped, 0);
if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
return;
}
iocpd->ops_in_progress++;
iocpd->read_in_progress = true;
}
static void drain_until_closed(iocpdesc_t *iocpd) {
size_t max_drain = 16 * 1024;
char buf[512];
read_result_t *result = iocpd->read_result;
while (result->drain_count < max_drain) {
int rv = recv(iocpd->socket, buf, 512, 0);
if (rv > 0)
result->drain_count += rv;
else if (rv == 0) {
iocpd->read_closed = true;
return;
} else if (WSAGetLastError() == WSAEWOULDBLOCK) {
// wait a little longer
start_reading(iocpd);
return;
}
else
break;
}
// Graceful close indication unlikely, force the issue
if (iocpd->iocp->iocp_trace)
if (result->drain_count >= max_drain)
iocp_log("graceful close on reader abandoned (too many chars)\n");
else
iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
iocpd->read_closed = true;
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
}
static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
{
iocpdesc_t *iocpd = result->base.iocpd;
iocpd->read_in_progress = false;
if (iocpd->closing) {
// Application no longer reading, but we are looking for a zero length read
if (!iocpd->read_closed)
drain_until_closed(iocpd);
reap_check(iocpd);
return;
}
if (status == 0 && xfer_count == 0) {
// Success.
pni_events_update(iocpd, iocpd->events | PN_READABLE);
} else {
iocpdesc_fail(iocpd, status, "IOCP read complete error");
}
}
ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
{
if (size == 0) return 0;
*would_block = false;
if (is_listener(iocpd)) {
set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
return SOCKET_ERROR;
}
if (iocpd->closing) {
// Previous call to pn_close()
set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
return SOCKET_ERROR;
}
if (iocpd->read_closed) {
if (pn_error_code(iocpd->error))
pn_error_copy(error, iocpd->error);
else
set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
return SOCKET_ERROR;
}
size_t count = recv(iocpd->socket, (char *) buf, size, 0);
if (count > 0) {
pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
begin_zero_byte_read(iocpd);
return count;
} else if (count == 0) {
iocpd->read_closed = true;
return 0;
}
if (WSAGetLastError() == WSAEWOULDBLOCK)
*would_block = true;
else
set_iocp_error_status(error, PN_ERR, WSAGetLastError());
return SOCKET_ERROR;
}
static void start_reading(iocpdesc_t *iocpd)
{
begin_zero_byte_read(iocpd);
}
// === The iocp descriptor
static void pni_iocpdesc_initialize(void *object)
{
iocpdesc_t *iocpd = (iocpdesc_t *) object;
memset(iocpd, 0, sizeof(iocpdesc_t));
iocpd->socket = INVALID_SOCKET;
}
static void pni_iocpdesc_finalize(void *object)
{
iocpdesc_t *iocpd = (iocpdesc_t *) object;
pn_free(iocpd->acceptor);
pn_error_free(iocpd->error);
if (iocpd->pipeline)
if (write_in_progress(iocpd))
iocp_log("iocp descriptor write leak\n");
else
pn_free(iocpd->pipeline);
if (iocpd->read_in_progress)
iocp_log("iocp descriptor read leak\n");
else
free(iocpd->read_result);
}
static uintptr_t pni_iocpdesc_hashcode(void *object)
{
iocpdesc_t *iocpd = (iocpdesc_t *) object;
return iocpd->socket;
}
#define pni_iocpdesc_compare NULL
#define pni_iocpdesc_inspect NULL
// Reference counted in the iocpdesc map, zombie_list, selector.
static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
{
static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
assert(iocpd);
iocpd->socket = s;
return iocpd;
}
static bool is_listener_socket(pn_socket_t s)
{
BOOL tval = false;
int tvalsz = sizeof(tval);
int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
return code == 0 && tval;
}
iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
assert (s != INVALID_SOCKET);
assert(!pni_iocpdesc_map_get(iocp, s));
bool listening = is_listener_socket(s);
iocpdesc_t *iocpd = pni_iocpdesc(s);
iocpd->iocp = iocp;
if (iocpd) {
iocpd->external = external;
iocpd->error = pn_error();
if (listening) {
iocpd->acceptor = pni_acceptor(iocpd);
} else {
iocpd->pipeline = pni_write_pipeline(iocpd);
iocpd->read_result = read_result(iocpd);
}
pni_iocpdesc_map_push(iocpd);
}
return iocpd;
}
iocpdesc_t *pni_deadline_desc(iocp_t *iocp) {
// Non IO descriptor for selector deadlines. Do not add to iocpdesc map or
// zombie list. Selector responsible to free/decref object.
iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET);
iocpd->iocp = iocp;
iocpd->deadline_desc = true;
return iocpd;
}
// === Fast lookup of a socket's iocpdesc_t
iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s);
return iocpd;
}
void pni_iocpdesc_map_push(iocpdesc_t *iocpd) {
pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd);
pn_decref(iocpd);
assert(pn_refcount(iocpd) == 1);
}
void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) {
pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s);
}
static void bind_to_completion_port(iocpdesc_t *iocpd)
{
if (iocpd->bound) return;
if (!iocpd->iocp->completion_port) {
iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
return;
}
if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
iocpd->bound = true;
else {
iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
}
}
static void release_sys_sendbuf(SOCKET s)
{
// Set the socket's send buffer size to zero.
int sz = 0;
int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
assert(status == 0);
}
void pni_iocpdesc_start(iocpdesc_t *iocpd)
{
if (iocpd->bound) return;
bind_to_completion_port(iocpd);
if (is_listener(iocpd)) {
begin_accept(iocpd->acceptor, NULL);
}
else {
release_sys_sendbuf(iocpd->socket);
pni_events_update(iocpd, PN_WRITABLE);
start_reading(iocpd);
}
}
static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
result->iocpd->ops_in_progress--;
DWORD status = success ? 0 : GetLastError();
switch (result->type) {
case IOCP_ACCEPT:
complete_accept((accept_result_t *) result, status);
break;
case IOCP_CONNECT:
complete_connect((connect_result_t *) result, status);
break;
case IOCP_WRITE:
complete_write((write_result_t *) result, num_transferred, status);
break;
case IOCP_READ:
complete_read((read_result_t *) result, num_transferred, status);
break;
default:
assert(false);
}
}
void pni_iocp_drain_completions(iocp_t *iocp)
{
while (true) {
DWORD timeout_ms = 0;
DWORD num_transferred = 0;
ULONG_PTR completion_key = 0;
OVERLAPPED *overlapped = 0;
bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
&completion_key, &overlapped, timeout_ms);
if (!overlapped)
return; // timed out
iocp_result_t *result = (iocp_result_t *) overlapped;
complete(result, good_op, num_transferred);
}
}
// returns: -1 on error, 0 on timeout, 1 successful completion
int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
DWORD num_transferred = 0;
ULONG_PTR completion_key = 0;
OVERLAPPED *overlapped = 0;
bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
&completion_key, &overlapped, win_timeout);
if (!overlapped)
if (GetLastError() == WAIT_TIMEOUT)
return 0;
else {
if (error)
pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
return -1;
}
iocp_result_t *result = (iocp_result_t *) overlapped;
complete(result, good_op, num_transferred);
return 1;
}
// === Close (graceful and otherwise)
// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
// and fully closed.
static void zombie_list_add(iocpdesc_t *iocpd)
{
assert(iocpd->closing);
if (!iocpd->ops_in_progress) {
// No need to make a zombie.
if (iocpd->socket != INVALID_SOCKET) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
iocpd->read_closed = true;
}
return;
}
// Allow 2 seconds for graceful shutdown before releasing socket resource.
iocpd->reap_time = pn_i_now() + 2000;
pn_list_add(iocpd->iocp->zombie_list, iocpd);
}
static void reap_check(iocpdesc_t *iocpd)
{
if (iocpd->closing && !iocpd->ops_in_progress) {
if (iocpd->socket != INVALID_SOCKET) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
}
pn_list_remove(iocpd->iocp->zombie_list, iocpd);
// iocpd is decref'ed and possibly released
}
}
pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
{
if (pn_list_size(iocp->zombie_list)) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
return iocpd->reap_time;
}
return 0;
}
void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
{
pn_list_t *zl = iocp->zombie_list;
// Look for stale zombies that should have been reaped by "now"
for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
if (iocpd->reap_time > now)
return;
if (iocpd->socket == INVALID_SOCKET)
continue;
assert(iocpd->ops_in_progress > 0);
if (iocp->iocp_trace)
iocp_log("async close: graceful close timeout exceeded\n");
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
iocpd->read_closed = true;
// outstanding ops should complete immediately now
}
}
static void drain_zombie_completions(iocp_t *iocp)
{
// No more pn_selector_select() from App, but zombies still need care and feeding
// until their outstanding async actions complete.
pni_iocp_drain_completions(iocp);
// Discard any that have no pending async IO
size_t sz = pn_list_size(iocp->zombie_list);
for (size_t idx = 0; idx < sz;) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
if (!iocpd->ops_in_progress) {
pn_list_del(iocp->zombie_list, idx, 1);
sz--;
} else {
idx++;
}
}
unsigned shutdown_grace = 2000;
char *override = getenv("PN_SHUTDOWN_GRACE");
if (override) {
int grace = atoi(override);
if (grace > 0 && grace < 60000)
shutdown_grace = (unsigned) grace;
}
pn_timestamp_t now = pn_i_now();
pn_timestamp_t deadline = now + shutdown_grace;
while (pn_list_size(iocp->zombie_list)) {
if (now >= deadline)
break;
int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
if (rv < 0) {
iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
break;
}
now = pn_i_now();
}
if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
// Should only happen if really slow TCP handshakes, i.e. total network failure
iocp_log("network failure on Proton shutdown\n");
}
static pn_list_t *iocp_map_close_all(iocp_t *iocp)
{
// Zombify stragglers, i.e. no pn_close() from the application.
pn_list_t *externals = pn_list(PN_OBJECT, 0);
for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
// Just listeners first.
if (is_listener(iocpd)) {
if (iocpd->external) {
// Owned by application, just keep a temporary reference to it.
// iocp_result_t structs must not be free'd until completed or
// the completion port is closed.
if (iocpd->ops_in_progress)
pn_list_add(externals, iocpd);
pni_iocpdesc_map_del(iocp, iocpd->socket);
} else {
// Make it a zombie.
pni_iocp_begin_close(iocpd);
}
}
}
pni_iocp_drain_completions(iocp);
for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
if (iocpd->external) {
iocpd->read_closed = true; // Do not consume from read side
iocpd->write_closed = true; // Do not shutdown write side
if (iocpd->ops_in_progress)
pn_list_add(externals, iocpd);
pni_iocpdesc_map_del(iocp, iocpd->socket);
} else {
// Make it a zombie.
pni_iocp_begin_close(iocpd);
}
}
return externals;
}
static void zombie_list_hard_close_all(iocp_t *iocp)
{
pni_iocp_drain_completions(iocp);
size_t zs = pn_list_size(iocp->zombie_list);
for (size_t i = 0; i < zs; i++) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
if (iocpd->socket != INVALID_SOCKET) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
iocpd->read_closed = true;
iocpd->write_closed = true;
}
}
pni_iocp_drain_completions(iocp);
// Zombies should be all gone. Do a sanity check.
zs = pn_list_size(iocp->zombie_list);
int remaining = 0;
int ops = 0;
for (size_t i = 0; i < zs; i++) {
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
remaining++;
ops += iocpd->ops_in_progress;
}
if (remaining)
iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
}
static void iocp_shutdown(iocpdesc_t *iocpd)
{
bool disconnected = false;
if (shutdown(iocpd->socket, SD_SEND)) {
int err = WSAGetLastError();
if (err == WSAECONNABORTED || err == WSAECONNRESET || err == WSAENOTCONN)
disconnected = true;
else if (iocpd->iocp->iocp_trace)
iocp_log("socket shutdown failed %d\n", err);
}
iocpd->write_closed = true;
if (iocpd->read_closed || disconnected) {
closesocket(iocpd->socket);
iocpd->socket = INVALID_SOCKET;
}
}
void pni_iocp_begin_close(iocpdesc_t *iocpd)
{
assert (!iocpd->closing);
if (is_listener(iocpd)) {
// Listening socket is easy. Close the socket which will cancel async ops.
pn_socket_t old_sock = iocpd->socket;
iocpd->socket = INVALID_SOCKET;
iocpd->closing = true;
iocpd->read_closed = true;
iocpd->write_closed = true;
closesocket(old_sock);
// Pending accepts will now complete. Zombie can die when all consumed.
zombie_list_add(iocpd);
pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd
} else {
// Continue async operation looking for graceful close confirmation or timeout.
pn_socket_t old_sock = iocpd->socket;
iocpd->closing = true;
if (!iocpd->write_closed && !write_in_progress(iocpd))
iocp_shutdown(iocpd);
zombie_list_add(iocpd);
pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd
}
}
// === iocp_t
#define pni_iocp_hashcode NULL
#define pni_iocp_compare NULL
#define pni_iocp_inspect NULL
void pni_iocp_initialize(void *obj)
{
iocp_t *iocp = (iocp_t *) obj;
memset(iocp, 0, sizeof(iocp_t));
pni_shared_pool_create(iocp);
iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
assert(iocp->completion_port != NULL);
iocp->iocpdesc_map = pn_hash(PN_OBJECT, 0, 0.75);
iocp->zombie_list = pn_list(PN_OBJECT, 0);
iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV");
iocp->selector = NULL;
}
void pni_iocp_finalize(void *obj)
{
iocp_t *iocp = (iocp_t *) obj;
// Move sockets to closed state, except external sockets.
pn_list_t *externals = iocp_map_close_all(iocp);
// Now everything with ops_in_progress is in the zombie_list or the externals list.
assert(!pn_hash_head(iocp->iocpdesc_map));
pn_free(iocp->iocpdesc_map);
drain_zombie_completions(iocp); // Last chance for graceful close
zombie_list_hard_close_all(iocp);
CloseHandle(iocp->completion_port); // This cancels all our async ops
iocp->completion_port = NULL;
if (pn_list_size(externals) && iocp->iocp_trace)
iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals));
// Now safe to free everything that might be touched by a former async operation.
pn_free(externals);
pn_free(iocp->zombie_list);
pni_shared_pool_free(iocp);
}
iocp_t *pni_iocp()
{
static const pn_cid_t CID_pni_iocp = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(pni_iocp);
iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t));
return iocp;
}