blob: e14e7140b5787160631b8724df50c10923477c60 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*
* A simple write buffer pool. Each socket has a dedicated "primary"
* buffer and can borrow from a shared pool with limited size tuning.
* Could enhance e.g. with separate pools per network interface and fancier
* memory tuning based on interface speed, system resources, and
* number of connections, etc.
*/
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
#if _WIN32_WINNT < 0x0501
#error "Proton requires Windows API support for XP or later."
#endif
#include <winsock2.h>
#include <Ws2tcpip.h>
#include "platform.h"
#include <proton/object.h>
#include <proton/io.h>
#include <proton/selector.h>
#include <proton/error.h>
#include <assert.h>
#include "selectable.h"
#include "util.h"
#include "iocp.h"
// Max overlapped writes per socket
#define IOCP_MAX_OWRITES 16
// Write buffer size
#define IOCP_WBUFSIZE 16384
static void pipeline_log(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fflush(stderr);
}
void pni_shared_pool_create(iocp_t *iocp)
{
// TODO: more pools (or larger one) when using multiple non-loopback interfaces
iocp->shared_pool_size = 16;
char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
if (env) {
int sz = atoi(env);
if (sz >= 0 && sz < 256) {
iocp->shared_pool_size = sz;
}
}
iocp->loopback_bufsize = 0;
env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
if (env) {
int sz = atoi(env);
if (sz >= 0 && sz <= 128 * 1024) {
iocp->loopback_bufsize = sz;
}
}
if (iocp->shared_pool_size) {
iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
HRESULT status = GetLastError();
if (!iocp->shared_pool_memory) {
perror("Proton write buffer pool allocation failure\n");
iocp->shared_pool_size = 0;
iocp->shared_available_count = 0;
return;
}
iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
iocp->shared_available_count = iocp->shared_pool_size;
char *mem = iocp->shared_pool_memory;
for (int i = 0; i < iocp->shared_pool_size; i++) {
iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
mem += IOCP_WBUFSIZE;
}
}
}
void pni_shared_pool_free(iocp_t *iocp)
{
for (int i = 0; i < iocp->shared_pool_size; i++) {
write_result_t *result = iocp->shared_results[i];
if (result->in_use)
pipeline_log("Proton buffer pool leak\n");
else
free(result);
}
if (iocp->shared_pool_size) {
free(iocp->shared_results);
free(iocp->available_results);
if (iocp->shared_pool_memory) {
if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
perror("write buffers release failed");
}
iocp->shared_pool_memory = NULL;
}
}
}
static void shared_pool_push(write_result_t *result)
{
iocp_t *iocp = result->base.iocpd->iocp;
assert(iocp->shared_available_count < iocp->shared_pool_size);
iocp->available_results[iocp->shared_available_count++] = result;
}
static write_result_t *shared_pool_pop(iocp_t *iocp)
{
return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
}
struct write_pipeline_t {
iocpdesc_t *iocpd;
size_t pending_count;
write_result_t *primary;
size_t reserved_count;
size_t next_primary_index;
size_t depth;
bool is_writer;
};
#define write_pipeline_compare NULL
#define write_pipeline_inspect NULL
#define write_pipeline_hashcode NULL
static void write_pipeline_initialize(void *object)
{
write_pipeline_t *pl = (write_pipeline_t *) object;
pl->pending_count = 0;
const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
pl->depth = 0;
pl->is_writer = false;
}
static void write_pipeline_finalize(void *object)
{
write_pipeline_t *pl = (write_pipeline_t *) object;
free((void *)pl->primary->buffer.start);
free(pl->primary);
}
write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
{
static const pn_cid_t CID_write_pipeline = CID_pn_void;
static const pn_class_t clazz = PN_CLASS(write_pipeline);
write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t));
pipeline->iocpd = iocpd;
pipeline->primary->base.iocpd = iocpd;
return pipeline;
}
static void confirm_as_writer(write_pipeline_t *pl)
{
if (!pl->is_writer) {
iocp_t *iocp = pl->iocpd->iocp;
iocp->writer_count++;
pl->is_writer = true;
}
}
static void remove_as_writer(write_pipeline_t *pl)
{
if (!pl->is_writer)
return;
iocp_t *iocp = pl->iocpd->iocp;
assert(iocp->writer_count);
pl->is_writer = false;
iocp->writer_count--;
}
/*
* Optimal depth will depend on properties of the NIC, server, and driver. For now,
* just distinguish between loopback interfaces and the rest. Optimizations in the
* loopback stack allow decent performance with depth 1 and actually cause major
* performance hiccups if set to large values.
*/
static void set_depth(write_pipeline_t *pl)
{
pl->depth = 1;
sockaddr_storage sa;
socklen_t salen = sizeof(sa);
char buf[INET6_ADDRSTRLEN];
DWORD buflen = sizeof(buf);
if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
(sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
// not loopback
pl->depth = IOCP_MAX_OWRITES;
} else {
iocp_t *iocp = pl->iocpd->iocp;
if (iocp->loopback_bufsize) {
const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
if (p) {
pl->primary->buffer.start = p;
pl->primary->buffer.size = iocp->loopback_bufsize;
}
}
}
}
}
// Reserve as many buffers as possible for count bytes.
size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
{
if (pl->primary->in_use)
return 0; // I.e. io->wouldblock
if (!pl->depth)
set_depth(pl);
if (pl->depth == 1) {
// always use the primary
pl->reserved_count = 1;
pl->next_primary_index = 0;
return 1;
}
iocp_t *iocp = pl->iocpd->iocp;
confirm_as_writer(pl);
size_t wanted = (count / IOCP_WBUFSIZE);
if (count % IOCP_WBUFSIZE)
wanted++;
size_t pending = pl->pending_count;
assert(pending < pl->depth);
size_t bufs = pn_min(wanted, pl->depth - pending);
// Can draw from shared pool or the primary... but share with others.
size_t writers = iocp->writer_count;
size_t shared_count = (iocp->shared_available_count + writers - 1) / writers;
bufs = pn_min(bufs, shared_count + 1);
pl->reserved_count = pending + bufs;
if (bufs == wanted &&
pl->reserved_count < (pl->depth / 2) &&
iocp->shared_available_count > (2 * writers + bufs)) {
// No shortage: keep the primary as spare for future use
pl->next_primary_index = pl->reserved_count;
} else if (bufs == 1) {
pl->next_primary_index = pending;
} else {
// let approx 1/3 drain before replenishing
pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
if (pl->next_primary_index < pending)
pl->next_primary_index = pending;
}
return bufs;
}
write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
{
size_t sz = pl->pending_count;
if (sz >= pl->reserved_count)
return NULL;
write_result_t *result;
if (sz == pl->next_primary_index) {
result = pl->primary;
} else {
assert(pl->iocpd->iocp->shared_available_count > 0);
result = shared_pool_pop(pl->iocpd->iocp);
}
result->in_use = true;
pl->pending_count++;
return result;
}
void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
{
result->in_use = false;
pl->pending_count--;
pl->reserved_count = 0;
if (result != pl->primary)
shared_pool_push(result);
if (pl->pending_count == 0)
remove_as_writer(pl);
}
bool pni_write_pipeline_writable(write_pipeline_t *pl)
{
// Only writable if not full and we can guarantee a buffer:
return pl->pending_count < pl->depth && !pl->primary->in_use;
}
size_t pni_write_pipeline_size(write_pipeline_t *pl)
{
return pl->pending_count;
}