blob: 0273b2a6b9a685fb8641a3bb4a6d1b7198a646ba [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <assert.h>
#include <poll.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <time.h>
#include <qpid/dispatch/driver.h>
#include <qpid/dispatch/threading.h>
#include "alloc.h"
#include <proton/error.h>
#include <proton/ssl.h>
#include <proton/object.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/log.h>
/* Decls */
#define MAX_HOST 1024
#define MAX_SERV 256
#define ERROR_MAX 128
#define PN_SEL_RD (0x0001)
#define PN_SEL_WR (0x0002)
DEQ_DECLARE(qdpn_listener_t, qdpn_listener_list_t);
DEQ_DECLARE(qdpn_connector_t, qdpn_connector_list_t);
const char *protocol_family_ipv4 = "IPv4";
const char *protocol_family_ipv6 = "IPv6";
const char *AF_INET6_STR = "AF_INET6";
const char *AF_INET_STR = "AF_INET";
struct qdpn_driver_t {
qd_log_source_t *log;
pn_trace_t trace;
sys_mutex_t *lock;
//
// The following values need to be protected by lock from multi-threaded access.
//
qdpn_listener_list_t listeners;
qdpn_connector_list_t connectors;
qdpn_listener_t *listener_next;
qdpn_connector_t *connector_next;
size_t closed_count;
//
// The following values will only be accessed by one thread at a time.
//
size_t capacity;
struct pollfd *fds;
size_t nfds;
int ctrl[2]; //pipe for updating selectable status
pn_timestamp_t wakeup;
};
struct qdpn_listener_t {
DEQ_LINKS(qdpn_listener_t);
qdpn_driver_t *driver;
void *context;
int idx;
int fd;
bool pending;
bool closed;
};
#define PN_NAME_MAX (256)
struct qdpn_connector_t {
DEQ_LINKS(qdpn_connector_t);
qdpn_driver_t *driver;
char name[PN_NAME_MAX];
char hostip[PN_NAME_MAX];
pn_timestamp_t wakeup;
pn_connection_t *connection;
pn_transport_t *transport;
qdpn_listener_t *listener;
void *context;
int idx;
int fd;
int status;
pn_trace_t trace;
bool pending_tick;
bool pending_read;
bool pending_write;
bool socket_error;
bool closed;
bool input_done;
bool output_done;
};
ALLOC_DECLARE(qdpn_listener_t);
ALLOC_DEFINE(qdpn_listener_t);
ALLOC_DECLARE(qdpn_connector_t);
ALLOC_DEFINE(qdpn_connector_t);
/* Impls */
static void pni_fatal(const char *text)
{
fprintf(stderr, "%s\n", text);
exit(1);
}
pn_timestamp_t pn_i_now(void)
{
struct timespec now;
#ifdef CLOCK_MONOTONIC_COARSE
int cid = CLOCK_MONOTONIC_COARSE;
#else
int cid = CLOCK_MONOTONIC;
#endif
if (clock_gettime(cid, &now)) pni_fatal("clock_gettime() failed");
return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
}
static bool pni_eq_nocase(const char *a, const char *b)
{
while (*b) {
if (tolower(*a++) != tolower(*b++))
return false;
}
return !(*a);
}
static bool pn_env_bool(const char *name)
{
char *v = getenv(name);
return v && (pni_eq_nocase(v, "true") || pni_eq_nocase(v, "1") ||
pni_eq_nocase(v, "yes") || pni_eq_nocase(v, "on"));
}
#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
static pn_timestamp_t pn_timestamp_min( pn_timestamp_t a, pn_timestamp_t b )
{
if (a && b) return pn_min(a, b);
if (a) return a;
return b;
}
static void qdpn_log_errno(qdpn_driver_t *d, const char *msg)
{
char ebuf[ERROR_MAX];
strerror_r(errno, ebuf, ERROR_MAX);
qd_log(d->log, QD_LOG_ERROR, "%s: %s", msg, ebuf);
}
// listener
static void qdpn_driver_add_listener(qdpn_driver_t *d, qdpn_listener_t *l)
{
if (!l->driver) return;
sys_mutex_lock(d->lock);
DEQ_INSERT_TAIL(d->listeners, l);
sys_mutex_unlock(d->lock);
l->driver = d;
}
static void qdpn_driver_remove_listener(qdpn_driver_t *d, qdpn_listener_t *l)
{
if (!l->driver) return;
sys_mutex_lock(d->lock);
if (l == d->listener_next)
d->listener_next = DEQ_NEXT(l);
DEQ_REMOVE(d->listeners, l);
sys_mutex_unlock(d->lock);
l->driver = NULL;
}
static int qdpn_create_socket(int af)
{
struct protoent *pe_tcp = getprotobyname("tcp");
if (pe_tcp == NULL)
return -1;
return socket(af, SOCK_STREAM, pe_tcp->p_proto);
}
static void qdpn_configure_sock(qdpn_driver_t *driver, int sock)
{
//
// Set the socket to be non-blocking for asynchronous operation.
//
int flags = fcntl(sock, F_GETFL);
flags |= O_NONBLOCK;
if (fcntl(sock, F_SETFL, flags) < 0)
qdpn_log_errno(driver, "fcntl");
//
// Disable the Nagle algorithm on TCP connections.
//
// Note: It would be more correct for the "level" argument to be SOL_TCP. However, there
// are portability issues with this macro so we use IPPROTO_TCP instead.
//
int tcp_nodelay = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0)
qdpn_log_errno(driver, "setsockopt");
}
/**
* Sets the ai_family field on the addrinfo struct based on the passed in NON-NULL protocol_family.
* If the passed in protocol family does not match IPv6, IPv4, the function does not set the ai_family field
*/
static void qd_set_addr_ai_family(qdpn_driver_t *driver, struct addrinfo *addr, const char* protocol_family)
{
if (protocol_family) {
if(strcmp(protocol_family, protocol_family_ipv6) == 0)
addr->ai_family = AF_INET6;
else if(strcmp(protocol_family, protocol_family_ipv4) == 0)
addr->ai_family = AF_INET;
}
}
qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver,
const char *host,
const char *port,
const char *protocol_family,
void* context)
{
if (!driver) return NULL;
struct addrinfo *addr;
int code = getaddrinfo(host, port, NULL, &addr);
if (code) {
qd_log(driver->log, QD_LOG_ERROR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
return 0;
}
// Set the protocol family before creating the socket.
qd_set_addr_ai_family(driver, addr, protocol_family);
int sock = qdpn_create_socket(addr->ai_family);
if (sock < 0) {
qdpn_log_errno(driver, "pn_create_socket");
freeaddrinfo(addr);
return 0;
}
int optval = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
qdpn_log_errno(driver, "setsockopt");
close(sock);
freeaddrinfo(addr);
return 0;
}
if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
qdpn_log_errno(driver, "bind");
freeaddrinfo(addr);
close(sock);
return 0;
}
freeaddrinfo(addr);
if (listen(sock, 50) == -1) {
qdpn_log_errno(driver, "listen");
close(sock);
return 0;
}
qdpn_listener_t *l = qdpn_listener_fd(driver, sock, context);
if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
fprintf(stderr, "Listening on %s:%s\n", host, port);
return l;
}
qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, int fd, void *context)
{
if (!driver) return NULL;
qdpn_listener_t *l = new_qdpn_listener_t();
if (!l) return NULL;
DEQ_ITEM_INIT(l);
l->driver = driver;
l->idx = 0;
l->pending = false;
l->fd = fd;
l->closed = false;
l->context = context;
qdpn_driver_add_listener(driver, l);
return l;
}
int qdpn_listener_get_fd(qdpn_listener_t *listener)
{
assert(listener);
return listener->fd;
}
qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver)
{
if (!driver)
return 0;
qdpn_listener_t *head;
sys_mutex_lock(driver->lock);
head = DEQ_HEAD(driver->listeners);
sys_mutex_unlock(driver->lock);
return head;
}
qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener)
{
if (!listener || !listener->driver)
return 0;
qdpn_listener_t *next;
sys_mutex_lock(listener->driver->lock);
next = DEQ_NEXT(listener);
sys_mutex_unlock(listener->driver->lock);
return next;
}
void qdpn_listener_trace(qdpn_listener_t *l, pn_trace_t trace)
{
// XXX
}
void *qdpn_listener_context(qdpn_listener_t *l)
{
return l ? l->context : NULL;
}
void qdpn_listener_set_context(qdpn_listener_t *listener, void *context)
{
assert(listener);
listener->context = context;
}
qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l,
void *policy,
bool (*policy_fn)(void *, const char *name),
bool *counted)
{
if (!l || !l->pending) return NULL;
char name[PN_NAME_MAX];
char host[MAX_HOST];
char serv[MAX_SERV];
char hostip[MAX_HOST];
struct sockaddr_in addr = {0};
addr.sin_family = AF_UNSPEC;
socklen_t addrlen = sizeof(addr);
int sock = accept(l->fd, (struct sockaddr *) &addr, &addrlen);
if (sock < 0) {
qdpn_log_errno(l->driver, "accept");
return 0;
} else {
int code;
if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, MAX_HOST, serv, MAX_SERV, 0)) ||
(code = getnameinfo((struct sockaddr *) &addr, addrlen, hostip, MAX_HOST, 0, 0, NI_NUMERICHOST))) {
qd_log(l->driver->log, QD_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(code));
close(sock);
return 0;
} else {
qdpn_configure_sock(l->driver, sock);
snprintf(name, PN_NAME_MAX-1, "%s:%s", host, serv);
}
}
if (policy_fn) {
if (!(*policy_fn)(policy, name)) {
close(sock);
return 0;
} else {
*counted = true;
}
}
if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
fprintf(stderr, "Accepted from %s\n", name);
qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL);
snprintf(c->name, PN_NAME_MAX, "%s", name);
snprintf(c->hostip, PN_NAME_MAX, "%s", hostip);
c->listener = l;
return c;
}
void qdpn_listener_close(qdpn_listener_t *l)
{
if (!l) return;
if (l->closed) return;
if (close(l->fd) == -1)
perror("close");
l->closed = true;
}
void qdpn_listener_free(qdpn_listener_t *l)
{
if (!l) return;
if (l->driver) qdpn_driver_remove_listener(l->driver, l);
free_qdpn_listener_t(l);
}
// connector
static void qdpn_driver_add_connector(qdpn_driver_t *d, qdpn_connector_t *c)
{
if (!c->driver) return;
sys_mutex_lock(d->lock);
DEQ_INSERT_TAIL(d->connectors, c);
sys_mutex_unlock(d->lock);
c->driver = d;
}
static void qdpn_driver_remove_connector(qdpn_driver_t *d, qdpn_connector_t *c)
{
if (!c->driver) return;
sys_mutex_lock(d->lock);
if (c == d->connector_next) {
d->connector_next = DEQ_NEXT(c);
}
DEQ_REMOVE(d->connectors, c);
c->driver = NULL;
if (c->closed) {
d->closed_count--;
}
sys_mutex_unlock(d->lock);
}
qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver,
const char *host,
const char *port,
const char *protocol_family,
void *context)
{
if (!driver) return NULL;
struct addrinfo *addr;
int code = getaddrinfo(host, port, NULL, &addr);
if (code) {
qd_log(driver->log, QD_LOG_ERROR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
return 0;
}
// Set the protocol family before creating the socket.
qd_set_addr_ai_family(driver, addr, protocol_family);
int sock = qdpn_create_socket(addr->ai_family);
if (sock == PN_INVALID_SOCKET) {
freeaddrinfo(addr);
qdpn_log_errno(driver, "pn_create_socket");
return 0;
}
qdpn_configure_sock(driver, sock);
if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
if (errno != EINPROGRESS) {
qdpn_log_errno(driver, "connect");
freeaddrinfo(addr);
close(sock);
return 0;
}
}
freeaddrinfo(addr);
qdpn_connector_t *c = qdpn_connector_fd(driver, sock, context);
snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port);
if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
fprintf(stderr, "Connected to %s\n", c->name);
return c;
}
qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context)
{
if (!driver) return NULL;
qdpn_connector_t *c = new_qdpn_connector_t();
if (!c) return NULL;
DEQ_ITEM_INIT(c);
c->driver = driver;
c->pending_tick = false;
c->pending_read = false;
c->pending_write = false;
c->socket_error = false;
c->name[0] = '\0';
c->idx = 0;
c->fd = fd;
c->status = PN_SEL_RD | PN_SEL_WR;
c->trace = driver->trace;
c->closed = false;
c->wakeup = 0;
c->connection = NULL;
c->transport = pn_transport();
c->input_done = false;
c->output_done = false;
c->context = context;
c->listener = NULL;
qdpn_connector_trace(c, driver->trace);
qdpn_driver_add_connector(driver, c);
return c;
}
int qdpn_connector_get_fd(qdpn_connector_t *connector)
{
assert(connector);
return connector->fd;
}
qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver)
{
if (!driver)
return 0;
sys_mutex_lock(driver->lock);
qdpn_connector_t *head = DEQ_HEAD(driver->connectors);
sys_mutex_unlock(driver->lock);
return head;
}
qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector)
{
if (!connector || !connector->driver)
return 0;
sys_mutex_lock(connector->driver->lock);
qdpn_connector_t *next = DEQ_NEXT(connector);
sys_mutex_unlock(connector->driver->lock);
return next;
}
void qdpn_connector_trace(qdpn_connector_t *ctor, pn_trace_t trace)
{
if (!ctor) return;
ctor->trace = trace;
if (ctor->transport) pn_transport_trace(ctor->transport, trace);
}
pn_transport_t *qdpn_connector_transport(qdpn_connector_t *ctor)
{
return ctor ? ctor->transport : NULL;
}
void qdpn_connector_set_connection(qdpn_connector_t *ctor, pn_connection_t *connection)
{
if (!ctor) return;
if (ctor->connection) {
pn_class_decref(PN_OBJECT, ctor->connection);
pn_transport_unbind(ctor->transport);
}
ctor->connection = connection;
if (ctor->connection) {
pn_class_incref(PN_OBJECT, ctor->connection);
pn_transport_bind(ctor->transport, connection);
}
if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace);
}
pn_connection_t *qdpn_connector_connection(qdpn_connector_t *ctor)
{
return ctor ? ctor->connection : NULL;
}
void *qdpn_connector_context(qdpn_connector_t *ctor)
{
return ctor ? ctor->context : NULL;
}
void qdpn_connector_set_context(qdpn_connector_t *ctor, void *context)
{
if (!ctor) return;
ctor->context = context;
}
const char *qdpn_connector_name(const qdpn_connector_t *ctor)
{
if (!ctor) return 0;
return ctor->name;
}
const char *qdpn_connector_hostip(const qdpn_connector_t *ctor)
{
if (!ctor) return 0;
return ctor->hostip;
}
qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor)
{
return ctor ? ctor->listener : NULL;
}
void qdpn_connector_close(qdpn_connector_t *ctor)
{
// XXX: should probably signal engine and callback here
if (!ctor) return;
ctor->status = 0;
if (close(ctor->fd) == -1)
perror("close");
if (!ctor->closed) {
sys_mutex_lock(ctor->driver->lock);
ctor->closed = true;
ctor->driver->closed_count++;
sys_mutex_unlock(ctor->driver->lock);
}
}
bool qdpn_connector_closed(qdpn_connector_t *ctor)
{
return ctor ? ctor->closed : true;
}
bool qdpn_connector_failed(qdpn_connector_t *ctor)
{
return ctor ? ctor->socket_error : true;
}
void qdpn_connector_free(qdpn_connector_t *ctor)
{
if (!ctor) return;
if (ctor->driver) qdpn_driver_remove_connector(ctor->driver, ctor);
pn_transport_unbind(ctor->transport);
pn_transport_free(ctor->transport);
ctor->transport = NULL;
if (ctor->connection) pn_class_decref(PN_OBJECT, ctor->connection);
ctor->connection = NULL;
free_qdpn_connector_t(ctor);
}
void qdpn_connector_activate(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit)
{
switch (crit) {
case QDPN_CONNECTOR_WRITABLE :
ctor->status |= PN_SEL_WR;
break;
case QDPN_CONNECTOR_READABLE :
ctor->status |= PN_SEL_RD;
break;
}
}
void qdpn_activate_all(qdpn_driver_t *d)
{
sys_mutex_lock(d->lock);
qdpn_connector_t *c = DEQ_HEAD(d->connectors);
while (c) {
c->status |= PN_SEL_WR;
c = DEQ_NEXT(c);
}
sys_mutex_unlock(d->lock);
}
bool qdpn_connector_activated(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit)
{
bool result = false;
switch (crit) {
case QDPN_CONNECTOR_WRITABLE :
result = ctor->pending_write;
ctor->pending_write = false;
ctor->status &= ~PN_SEL_WR;
break;
case QDPN_CONNECTOR_READABLE :
result = ctor->pending_read;
ctor->pending_read = false;
ctor->status &= ~PN_SEL_RD;
break;
}
return result;
}
static pn_timestamp_t qdpn_connector_tick(qdpn_connector_t *ctor, pn_timestamp_t now)
{
if (!ctor->transport) return 0;
return pn_transport_tick(ctor->transport, now);
}
void qdpn_connector_process(qdpn_connector_t *c)
{
if (c) {
if (c->closed) return;
pn_transport_t *transport = c->transport;
///
/// Socket read
///
if (!c->input_done) {
ssize_t capacity = pn_transport_capacity(transport);
if (capacity > 0) {
c->status |= PN_SEL_RD;
if (c->pending_read) {
c->pending_read = false;
ssize_t n = recv(c->fd, pn_transport_tail(transport), capacity, 0);
if (n < 0) {
if (errno != EAGAIN) {
perror("read");
c->status &= ~PN_SEL_RD;
c->input_done = true;
pn_transport_close_tail( transport );
}
} else if (n == 0) {
c->status &= ~PN_SEL_RD;
c->input_done = true;
pn_transport_close_tail( transport );
} else {
if (pn_transport_process(transport, (size_t) n) < 0) {
c->status &= ~PN_SEL_RD;
c->input_done = true;
}
}
}
}
capacity = pn_transport_capacity(transport);
if (capacity < 0) {
c->status &= ~PN_SEL_RD;
c->input_done = true;
}
}
///
/// Event wakeup
///
c->wakeup = qdpn_connector_tick(c, pn_i_now());
///
/// Socket write
///
if (!c->output_done) {
ssize_t pending = pn_transport_pending(transport);
if (pending > 0) {
c->status |= PN_SEL_WR;
if (c->pending_write) {
c->pending_write = false;
ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL);
if (n < 0) {
// XXX
if (errno != EAGAIN) {
perror("send");
c->output_done = true;
c->status &= ~PN_SEL_WR;
pn_transport_close_head( transport );
}
} else if (n) {
pn_transport_pop(transport, (size_t) n);
}
}
} else if (pending == 0) {
c->status &= ~PN_SEL_WR;
} else {
c->output_done = true;
c->status &= ~PN_SEL_WR;
}
} else
c->status &= ~PN_SEL_WR;
// Closed?
if (c->input_done && c->output_done) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "Closed %s\n", c->name);
}
qdpn_connector_close(c);
}
}
}
// driver
qdpn_driver_t *qdpn_driver()
{
qdpn_driver_t *d = (qdpn_driver_t *) malloc(sizeof(qdpn_driver_t));
if (!d) return NULL;
DEQ_INIT(d->listeners);
DEQ_INIT(d->connectors);
d->log = qd_log_source("DRIVER");
d->lock = sys_mutex();
d->listener_next = NULL;
d->connector_next = NULL;
d->closed_count = 0;
d->capacity = 0;
d->fds = NULL;
d->nfds = 0;
d->ctrl[0] = 0;
d->ctrl[1] = 0;
d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
(pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
(pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
d->wakeup = 0;
// XXX
if (pipe(d->ctrl)) {
perror("Can't create control pipe");
}
return d;
}
void qdpn_driver_trace(qdpn_driver_t *d, pn_trace_t trace)
{
d->trace = trace;
}
void qdpn_driver_free(qdpn_driver_t *d)
{
if (!d) return;
close(d->ctrl[0]);
close(d->ctrl[1]);
while (DEQ_HEAD(d->connectors))
qdpn_connector_free(DEQ_HEAD(d->connectors));
while (DEQ_HEAD(d->listeners))
qdpn_listener_free(DEQ_HEAD(d->listeners));
free(d->fds);
sys_mutex_free(d->lock);
free(d);
}
int qdpn_driver_wakeup(qdpn_driver_t *d)
{
if (d) {
ssize_t count = write(d->ctrl[1], "x", 1);
if (count <= 0) {
return count;
} else {
return 0;
}
} else {
return PN_ARG_ERR;
}
}
static void qdpn_driver_rebuild(qdpn_driver_t *d)
{
sys_mutex_lock(d->lock);
size_t size = DEQ_SIZE(d->listeners) + DEQ_SIZE(d->connectors);
while (d->capacity < size + 1) {
d->capacity = d->capacity ? 2*d->capacity : 16;
d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd));
}
d->wakeup = 0;
d->nfds = 0;
d->fds[d->nfds].fd = d->ctrl[0];
d->fds[d->nfds].events = POLLIN;
d->fds[d->nfds].revents = 0;
d->nfds++;
qdpn_listener_t *l = DEQ_HEAD(d->listeners);
while (l) {
d->fds[d->nfds].fd = l->fd;
d->fds[d->nfds].events = POLLIN;
d->fds[d->nfds].revents = 0;
l->idx = d->nfds;
d->nfds++;
l = DEQ_NEXT(l);
}
qdpn_connector_t *c = DEQ_HEAD(d->connectors);
while (c) {
if (!c->closed) {
d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
d->fds[d->nfds].fd = c->fd;
d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
d->fds[d->nfds].revents = 0;
c->idx = d->nfds;
d->nfds++;
}
c = DEQ_NEXT(c);
}
sys_mutex_unlock(d->lock);
}
void qdpn_driver_wait_1(qdpn_driver_t *d)
{
qdpn_driver_rebuild(d);
}
int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout)
{
if (d->wakeup) {
pn_timestamp_t now = pn_i_now();
if (now >= d->wakeup)
timeout = 0;
else
timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now);
}
int result = poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout);
if (result == -1 && errno != EINTR)
qdpn_log_errno(d, "poll");
return result;
}
int qdpn_driver_wait_3(qdpn_driver_t *d)
{
bool woken = false;
if (d->fds[0].revents & POLLIN) {
woken = true;
//clear the pipe
char buffer[512];
while (read(d->ctrl[0], buffer, 512) == 512);
}
sys_mutex_lock(d->lock);
qdpn_listener_t *l = DEQ_HEAD(d->listeners);
while (l) {
l->pending = (l->idx && d->fds[l->idx].revents & POLLIN);
l = DEQ_NEXT(l);
}
pn_timestamp_t now = pn_i_now();
qdpn_connector_t *c = DEQ_HEAD(d->connectors);
while (c) {
if (c->closed) {
c->pending_read = false;
c->pending_write = false;
c->pending_tick = false;
} else {
int idx = c->idx;
c->pending_read = (idx && d->fds[idx].revents & POLLIN);
c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
c->pending_tick = (c->wakeup && c->wakeup <= now);
if (idx && d->fds[idx].revents & POLLERR)
c->socket_error = true;
else if (idx && (d->fds[idx].revents & POLLHUP)) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "hangup on connector %s\n", c->name);
}
/* poll() is signalling POLLHUP. to see what happened we need
* to do an actual recv() to get the error code. But we might
* be in a state where we're not interested in input, in that
* case try to get the error code via send() */
if (d->fds[idx].events & POLLIN)
c->pending_read = true;
else if (d->fds[idx].events & POLLOUT)
c->pending_write = true;
} else if (idx && (d->fds[idx].revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP))) {
if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
fprintf(stderr, "Unexpected poll events: %04x on %s\n",
d->fds[idx].revents, c->name);
}
}
}
c = DEQ_NEXT(c);
}
d->listener_next = DEQ_HEAD(d->listeners);
d->connector_next = DEQ_HEAD(d->connectors);
sys_mutex_unlock(d->lock);
return woken ? PN_INTR : 0;
}
//
// XXX - pn_driver_wait has been divided into three internal functions as a
// temporary workaround for a multi-threading problem. A multi-threaded
// application must hold a lock on parts 1 and 3, but not on part 2.
// This temporary change, which is not reflected in the driver's API, allows
// a multi-threaded application to use the three parts separately.
//
// This workaround will eventually be replaced by a more elegant solution
// to the problem.
//
int qdpn_driver_wait(qdpn_driver_t *d, int timeout)
{
qdpn_driver_wait_1(d);
int result = qdpn_driver_wait_2(d, timeout);
if (result == -1)
return errno;
return qdpn_driver_wait_3(d);
}
qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *d)
{
if (!d) return NULL;
sys_mutex_lock(d->lock);
while (d->listener_next) {
qdpn_listener_t *l = d->listener_next;
d->listener_next = DEQ_NEXT(l);
if (l->pending) {
sys_mutex_unlock(d->lock);
return l;
}
}
sys_mutex_unlock(d->lock);
return NULL;
}
qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *d)
{
if (!d) return NULL;
sys_mutex_lock(d->lock);
while (d->connector_next) {
qdpn_connector_t *c = d->connector_next;
d->connector_next = DEQ_NEXT(c);
if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->socket_error) {
sys_mutex_unlock(d->lock);
return c;
}
}
sys_mutex_unlock(d->lock);
return NULL;
}