blob: 4a87fd2b03a2fd49d1aea343cb331d495a6eb341 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#define FD_SETSIZE 2048
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
#if _WIN32_WINNT < 0x0501
#error "Proton requires Windows API support for XP or later."
#endif
#include <winsock2.h>
#include <mswsock.h>
#include <Ws2tcpip.h>
#include "platform.h"
#include <proton/io.h>
#include <proton/object.h>
#include <proton/selector.h>
#include "iocp.h"
#include "util.h"
#include <ctype.h>
#include <errno.h>
#include <stdio.h>
#include <assert.h>
int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
{
// Error code can be from GetLastError or WSAGetLastError,
char err[1024] = {0};
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
}
static void io_log(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fflush(stderr);
}
struct pn_io_t {
char host[NI_MAXHOST];
char serv[NI_MAXSERV];
pn_error_t *error;
bool trace;
bool wouldblock;
iocp_t *iocp;
};
void pn_io_initialize(void *obj)
{
pn_io_t *io = (pn_io_t *) obj;
io->error = pn_error();
io->wouldblock = false;
io->trace = pn_env_bool("PN_TRACE_DRV");
/* Request WinSock 2.2 */
WORD wsa_ver = MAKEWORD(2, 2);
WSADATA unused;
int err = WSAStartup(wsa_ver, &unused);
if (err) {
pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error));
}
io->iocp = pni_iocp();
}
void pn_io_finalize(void *obj)
{
pn_io_t *io = (pn_io_t *) obj;
pn_error_free(io->error);
pn_free(io->iocp);
WSACleanup();
}
#define pn_io_hashcode NULL
#define pn_io_compare NULL
#define pn_io_inspect
pn_io_t *pn_io(void)
{
static const pn_class_t clazz = PN_CLASS(pn_io);
pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
return io;
}
void pn_io_free(pn_io_t *io)
{
pn_free(io);
}
pn_error_t *pn_io_error(pn_io_t *io)
{
assert(io);
return io->error;
}
static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
{
// A brand new socket can have the same HANDLE value as a previous
// one after a socketclose. If the application closes one itself
// (i.e. not using pn_close), we don't find out about it until here.
iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
if (iocpd) {
if (io->trace)
io_log("Stale external socket reference discarded\n");
// Re-use means former socket instance was closed
assert(iocpd->ops_in_progress == 0);
assert(iocpd->external);
// Clean up the straggler as best we can
pn_socket_t sock = iocpd->socket;
iocpd->socket = INVALID_SOCKET;
pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount
}
}
/*
* This heavyweight surrogate pipe could be replaced with a normal Windows pipe
* now that select() is no longer used. If interrupt semantics are all that is
* needed, a simple user space counter and reserved completion status would
* probably suffice.
*/
static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
int pn_pipe(pn_io_t *io, pn_socket_t *dest)
{
int n = pni_socket_pair(io, dest);
if (n) {
pni_win32_error(io->error, "pipe", WSAGetLastError());
}
return n;
}
static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
//
// Disable the Nagle algorithm on TCP connections.
//
int flag = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
perror("setsockopt");
}
u_long nonblock = 1;
if (ioctlsocket(sock, FIONBIO, &nonblock)) {
perror("ioctlsocket");
}
}
static inline pn_socket_t pni_create_socket(int domain, int protocol);
static const char *amqp_service(const char *port) {
// Help older Windows to know about amqp[s] ports
if (port) {
if (!strcmp("amqp", port)) return "5672";
if (!strcmp("amqps", port)) return "5671";
}
return port;
}
pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
{
struct addrinfo *addr;
int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
if (code) {
pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
return INVALID_SOCKET;
}
pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
if (sock == INVALID_SOCKET) {
pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
return INVALID_SOCKET;
}
ensure_unique(io, sock);
bool optval = 1;
if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
sizeof(optval)) == -1) {
pni_win32_error(io->error, "setsockopt", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
pni_win32_error(io->error, "bind", WSAGetLastError());
freeaddrinfo(addr);
closesocket(sock);
return INVALID_SOCKET;
}
freeaddrinfo(addr);
if (listen(sock, 50) == -1) {
pni_win32_error(io->error, "listen", WSAGetLastError());
closesocket(sock);
return INVALID_SOCKET;
}
if (io->iocp->selector) {
iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
if (!iocpd) {
pn_i_error_from_errno(io->error, "register");
closesocket(sock);
return INVALID_SOCKET;
}
pni_iocpdesc_start(iocpd);
}
return sock;
}
pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
{
// convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";
struct addrinfo *addr;
int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
if (code) {
pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
return INVALID_SOCKET;
}
pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
if (sock == INVALID_SOCKET) {
pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
freeaddrinfo(addr);
return INVALID_SOCKET;
}
ensure_unique(io, sock);
pn_configure_sock(io, sock);
if (io->iocp->selector) {
return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
} else {
if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
if (WSAGetLastError() != WSAEWOULDBLOCK) {
pni_win32_error(io->error, "connect", WSAGetLastError());
freeaddrinfo(addr);
closesocket(sock);
return INVALID_SOCKET;
}
}
freeaddrinfo(addr);
return sock;
}
}
pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
pn_socket_t accept_sock;
*name = '\0';
if (listend)
accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
else {
// User supplied socket
accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
if (accept_sock == INVALID_SOCKET)
pni_win32_error(io->error, "sync accept", WSAGetLastError());
}
if (accept_sock == INVALID_SOCKET)
return accept_sock;
int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
io->serv, NI_MAXSERV, 0);
if (code)
code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
if (code) {
pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
pn_close(io, accept_sock);
return INVALID_SOCKET;
} else {
pn_configure_sock(io, accept_sock);
snprintf(name, size, "%s:%s", io->host, io->serv);
if (listend) {
pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
}
return accept_sock;
}
}
static inline pn_socket_t pni_create_socket(int domain, int protocol) {
return socket(domain, SOCK_STREAM, protocol);
}
ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
ssize_t count;
iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
if (iocpd) {
count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
} else {
count = send(sockfd, (const char *) buf, len, 0);
io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
}
return count;
}
ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
{
ssize_t count;
iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
if (iocpd) {
count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
} else {
count = recv(socket, (char *) buf, size, 0);
io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
}
return count;
}
ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
{
// non-socket io is mapped to socket io for now. See pn_pipe()
return pn_send(io, socket, buf, size);
}
ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
{
return pn_recv(io, socket, buf, size);
}
void pn_close(pn_io_t *io, pn_socket_t socket)
{
iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
if (iocpd)
pni_iocp_begin_close(iocpd);
else {
closesocket(socket);
}
}
bool pn_wouldblock(pn_io_t *io)
{
return io->wouldblock;
}
pn_selector_t *pn_io_selector(pn_io_t *io)
{
if (io->iocp->selector == NULL)
io->iocp->selector = pni_selector_create(io->iocp);
return io->iocp->selector;
}
static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
{
u_long v = 1;
ioctlsocket (sock, FIONBIO, &v);
ensure_unique(io, sock);
iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
pni_iocpdesc_start(iocpd);
}
static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
// no socketpair on windows. provide pipe() semantics using sockets
struct protoent * pe_tcp = getprotobyname("tcp");
if (pe_tcp == NULL) {
perror("getprotobyname");
return -1;
}
SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto);
if (sock == INVALID_SOCKET) {
perror("socket");
return -1;
}
BOOL b = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
perror("setsockopt");
closesocket(sock);
return -1;
}
else {
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_port = 0;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind");
closesocket(sock);
return -1;
}
}
if (listen(sock, 50) == -1) {
perror("listen");
closesocket(sock);
return -1;
}
if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) {
perror("sock1");
closesocket(sock);
return -1;
}
else {
struct sockaddr addr = {0};
int l = sizeof(addr);
if (getsockname(sock, &addr, &l) == -1) {
perror("getsockname");
closesocket(sock);
return -1;
}
if (connect(sv[1], &addr, sizeof(addr)) == -1) {
int err = WSAGetLastError();
fprintf(stderr, "connect wsaerrr %d\n", err);
closesocket(sock);
closesocket(sv[1]);
return -1;
}
if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
perror("accept");
closesocket(sock);
closesocket(sv[1]);
return -1;
}
}
configure_pipe_socket(io, sv[0]);
configure_pipe_socket(io, sv[1]);
closesocket(sock);
return 0;
}