blob: 35afd5c1fd4034b0c1e2851168cce72dadbf632f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <uv.h>
#include <proton/condition.h>
#include <proton/connection_driver.h>
#include <proton/engine.h>
#include <proton/extra.h>
#include <proton/message.h>
#include <proton/object.h>
#include <proton/proactor.h>
#include <proton/transport.h>
#include <proton/url.h>
#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/*
libuv loop functions are thread unsafe. The only exception is uv_async_send()
which is a thread safe "wakeup" that can wake the uv_loop from another thread.
To provide concurrency the proactor uses a "leader-worker-follower" model,
threads take turns at the roles:
- a single "leader" calls libuv functions and runs the uv_loop in short bursts
to generate work. When there is work available it gives up leadership and
becomes a "worker"
- "workers" handle events concurrently for distinct connections/listeners
They do as much work as they can get, when none is left they become "followers"
- "followers" wait for the leader to generate work and become workers.
When the leader itself becomes a worker, one of the followers takes over.
This model is symmetric: any thread can take on any role based on run-time
requirements. It also allows the IO and non-IO work associated with an IO
wake-up to be processed in a single thread with no context switches.
Function naming:
- on_ - called in leader thread via uv_run().
- leader_ - called in leader thread, while processing the leader_q.
- owner_ - called in owning thread, leader or worker but not concurrently.
Note on_ and leader_ functions can call each other, the prefix indicates the
path they are most often called on.
*/
const char *COND_NAME = "proactor";
const char *AMQP_PORT = "5672";
const char *AMQP_PORT_NAME = "amqp";
const char *AMQPS_PORT = "5671";
const char *AMQPS_PORT_NAME = "amqps";
PN_HANDLE(PN_PROACTOR)
/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
Class definitions are for identification as pn_event_t context only.
*/
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
/* common to connection and listener */
typedef struct psocket_t {
/* Immutable */
pn_proactor_t *proactor;
/* Protected by proactor.lock */
struct psocket_t* next;
void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
/* Only used by leader */
uv_tcp_t tcp;
void (*action)(struct psocket_t*); /* deferred action for leader */
bool is_conn:1;
char host[NI_MAXHOST];
char port[NI_MAXSERV];
} psocket_t;
/* Special value for psocket.next pointer when socket is not on any any list. */
psocket_t UNLISTED;
static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
ps->proactor = p;
ps->next = &UNLISTED;
ps->is_conn = is_conn;
ps->tcp.data = ps;
/* For platforms that don't know about "amqp" and "amqps" service names. */
if (strcmp(port, AMQP_PORT_NAME) == 0)
port = AMQP_PORT;
else if (strcmp(port, AMQPS_PORT_NAME) == 0)
port = AMQPS_PORT;
/* Set to "\001" to indicate a NULL as opposed to an empty string "" */
strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
}
/* Turn "\001" back to NULL */
static inline const char* fixstr(const char* str) {
return str[0] == '\001' ? NULL : str;
}
typedef struct pconnection_t {
psocket_t psocket;
/* Only used by owner thread */
pn_connection_driver_t driver;
/* Only used by leader */
uv_connect_t connect;
uv_timer_t timer;
uv_write_t write;
uv_shutdown_t shutdown;
size_t writing;
bool reading:1;
bool server:1; /* accept, not connect */
} pconnection_t;
struct pn_listener_t {
psocket_t psocket;
/* Only used by owner thread */
pn_condition_t *condition;
pn_collector_t *collector;
pn_event_batch_t batch;
size_t backlog;
};
PN_EXTRA_DECLARE(pn_listener_t);
typedef struct queue { psocket_t *front, *back; } queue;
struct pn_proactor_t {
/* Leader thread */
uv_cond_t cond;
uv_loop_t loop;
uv_async_t async;
/* Owner thread: proactor collector and batch can belong to leader or a worker */
pn_collector_t *collector;
pn_event_batch_t batch;
/* Protected by lock */
uv_mutex_t lock;
queue start_q;
queue worker_q;
queue leader_q;
size_t interrupt; /* pending interrupts */
size_t count; /* psocket count */
bool inactive:1;
bool has_leader:1;
bool batch_working:1; /* batch belongs to a worker. */
};
static bool push_lh(queue *q, psocket_t *ps) {
if (ps->next != &UNLISTED) /* Don't move if already listed. */
return false;
ps->next = NULL;
if (!q->front) {
q->front = q->back = ps;
} else {
q->back->next = ps;
q->back = ps;
}
return true;
}
static psocket_t* pop_lh(queue *q) {
psocket_t *ps = q->front;
if (ps) {
q->front = ps->next;
ps->next = &UNLISTED;
}
return ps;
}
static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
return ps->is_conn ? (pconnection_t*)ps : NULL;
}
static inline pn_listener_t *as_listener(psocket_t* ps) {
return ps->is_conn ? NULL: (pn_listener_t*)ps;
}
/* Put ps on the leader queue for processing. Thread safe. */
static void to_leader_lh(psocket_t *ps) {
push_lh(&ps->proactor->leader_q, ps);
uv_async_send(&ps->proactor->async); /* Wake leader */
}
static void to_leader(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
to_leader_lh(ps);
uv_mutex_unlock(&ps->proactor->lock);
}
/* Detach from IO and put ps on the worker queue */
static void leader_to_worker(psocket_t *ps) {
pconnection_t *pc = as_pconnection_t(ps);
/* Don't detach if there are no events yet. */
if (pc && pn_connection_driver_has_event(&pc->driver)) {
if (pc->writing) {
pc->writing = 0;
uv_cancel((uv_req_t*)&pc->write);
}
if (pc->reading) {
pc->reading = false;
uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
}
if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
uv_timer_stop(&pc->timer);
}
}
/* Nothing to do for a listener, on_accept doesn't touch worker state. */
uv_mutex_lock(&ps->proactor->lock);
push_lh(&ps->proactor->worker_q, ps);
uv_mutex_unlock(&ps->proactor->lock);
}
/* Set a deferred action for leader, if not already set. */
static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
if (!ps->action) {
ps->action = action;
}
to_leader_lh(ps);
uv_mutex_unlock(&ps->proactor->lock);
}
/* Owner thread send to worker thread. Set deferred action if not already set. */
static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
if (!ps->action) {
ps->action = action;
}
push_lh(&ps->proactor->worker_q, ps);
uv_async_send(&ps->proactor->async); /* Wake leader */
uv_mutex_unlock(&ps->proactor->lock);
}
/* Re-queue for further work */
static void worker_requeue(psocket_t* ps) {
uv_mutex_lock(&ps->proactor->lock);
push_lh(&ps->proactor->worker_q, ps);
uv_async_send(&ps->proactor->async); /* Wake leader */
uv_mutex_unlock(&ps->proactor->lock);
}
static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
if (!pc) return NULL;
if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
return NULL;
}
if (extra.start && extra.size) {
memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
}
psocket_init(&pc->psocket, p, true, host, port);
if (server) {
pn_transport_set_server(pc->driver.transport);
}
pn_record_t *r = pn_connection_attachments(pc->driver.connection);
pn_record_def(r, PN_PROACTOR, PN_VOID);
pn_record_set(r, PN_PROACTOR, pc);
return pc;
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
return (batch->next_event == proactor_batch_next) ?
(pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
}
static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
return (batch->next_event == listener_batch_next) ?
(pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
}
static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
}
pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
if (!l) {
return NULL;
}
l->collector = pn_collector();
if (!l->collector) {
free(l);
return NULL;
}
if (extra.start && extra.size) {
memcpy(pn_listener_get_extra(l).start, extra.start, extra.size);
}
psocket_init(&l->psocket, p, false, host, port);
l->condition = pn_condition();
l->batch.next_event = listener_batch_next;
l->backlog = backlog;
return l;
}
static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_lock(&p->lock);
p->count += change;
p->inactive = (p->count == 0);
uv_mutex_unlock(&p->lock);
}
/* Free if there are no uv callbacks pending and no events */
static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
if (pn_connection_driver_has_event(&pc->driver)) {
leader_to_worker(&pc->psocket); /* Return to worker */
} else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
/* All UV requests are finished */
pn_connection_driver_destroy(&pc->driver);
leader_count(pc->psocket.proactor, -1);
free(pc);
}
}
/* Free if there are no uv callbacks pending and no events */
static void leader_listener_maybe_free(pn_listener_t *l) {
if (pn_collector_peek(l->collector)) {
leader_to_worker(&l->psocket); /* Return to worker */
} else if (!l->psocket.tcp.data) {
pn_condition_free(l->condition);
leader_count(l->psocket.proactor, -1);
free(l);
}
}
/* Free if there are no uv callbacks pending and no events */
static void leader_maybe_free(psocket_t *ps) {
if (ps->is_conn) {
leader_pconnection_t_maybe_free(as_pconnection_t(ps));
} else {
leader_listener_maybe_free(as_listener(ps));
}
}
static void on_close(uv_handle_t *h) {
psocket_t *ps = (psocket_t*)h->data;
h->data = NULL; /* Mark closed */
leader_maybe_free(ps);
}
static void on_shutdown(uv_shutdown_t *shutdown, int err) {
psocket_t *ps = (psocket_t*)shutdown->data;
shutdown->data = NULL; /* Mark closed */
leader_maybe_free(ps);
}
static inline void leader_close(psocket_t *ps) {
if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
uv_close((uv_handle_t*)&ps->tcp, on_close);
}
pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
pn_connection_driver_close(&pc->driver);
if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
uv_timer_stop(&pc->timer);
uv_close((uv_handle_t*)&pc->timer, on_close);
}
}
leader_maybe_free(ps);
}
static pconnection_t *get_pconnection_t(pn_connection_t* c) {
if (!c) return NULL;
pn_record_t *r = pn_connection_attachments(c);
return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
static void leader_error(psocket_t *ps, int err, const char* what) {
if (ps->is_conn) {
pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
what, fixstr(ps->host), fixstr(ps->port),
uv_strerror(err));
pn_connection_driver_close(driver);
} else {
pn_listener_t *l = as_listener(ps);
pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
what, fixstr(ps->host), fixstr(ps->port),
uv_strerror(err));
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
}
leader_to_worker(ps); /* Worker to handle the error */
}
/* uv-initialization */
static int leader_init(psocket_t *ps) {
leader_count(ps->proactor, +1);
int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
if (!err) {
pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
pc->connect.data = ps;
int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
if (!err) {
pc->timer.data = pc;
}
}
}
if (err) {
leader_error(ps, err, "initialization");
}
return err;
}
/* Common logic for on_connect and on_accept */
static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
if (!err) {
leader_to_worker(&pc->psocket);
} else {
leader_error(&pc->psocket, err, what);
}
}
static void on_connect(uv_connect_t *connect, int err) {
leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
}
static void on_accept(uv_stream_t* server, int err) {
pn_listener_t* l = (pn_listener_t*)server->data;
if (!err) {
pn_rwbytes_t v = pn_listener_get_extra(l);
pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
fixstr(l->psocket.host),
fixstr(l->psocket.port),
pn_bytes(v.size, v.start));
if (pc) {
int err2 = leader_init(&pc->psocket);
if (!err2) err2 = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
leader_connect_accept(pc, err2, "on accept");
} else {
err = UV_ENOMEM;
}
}
if (err) {
leader_error(&l->psocket, err, "on accept");
}
}
static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
int err = leader_init(ps);
struct addrinfo hints = { 0 };
if (server) hints.ai_flags = AI_PASSIVE;
if (!err) {
err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
}
return err;
}
static void leader_connect(psocket_t *ps) {
pconnection_t *pc = as_pconnection_t(ps);
uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, false);
if (!err) {
err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
uv_freeaddrinfo(info.addrinfo);
}
if (err) {
leader_error(ps, err, "connect to");
}
}
static void leader_listen(psocket_t *ps) {
pn_listener_t *l = as_listener(ps);
uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, true);
if (!err) {
err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
uv_freeaddrinfo(info.addrinfo);
}
if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
if (err) {
leader_error(ps, err, "listen on ");
}
}
static void on_tick(uv_timer_t *timer) {
pconnection_t *pc = (pconnection_t*)timer->data;
pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
uv_timer_stop(&pc->timer);
uint64_t now = uv_now(pc->timer.loop);
uint64_t next = pn_transport_tick(t, now);
if (next) {
uv_timer_start(&pc->timer, on_tick, next - now, 0);
}
}
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
if (nread >= 0) {
pn_connection_driver_read_done(&pc->driver, nread);
on_tick(&pc->timer); /* check for tick changes. */
leader_to_worker(&pc->psocket);
/* Reading continues automatically until stopped. */
} else if (nread == UV_EOF) { /* hangup */
pn_connection_driver_read_close(&pc->driver);
leader_maybe_free(&pc->psocket);
} else {
leader_error(&pc->psocket, nread, "on read from");
}
}
static void on_write(uv_write_t* write, int err) {
pconnection_t *pc = (pconnection_t*)write->data;
write->data = NULL;
if (err == 0) {
pn_connection_driver_write_done(&pc->driver, pc->writing);
leader_to_worker(&pc->psocket);
} else if (err == UV_ECANCELED) {
leader_maybe_free(&pc->psocket);
} else {
leader_error(&pc->psocket, err, "on write to");
}
pc->writing = 0; /* Need to send a new write request */
}
// Read buffer allocation function for uv, just returns the transports read buffer.
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
static void leader_rewatch(psocket_t *ps) {
pconnection_t *pc = as_pconnection_t(ps);
if (pc->timer.data) { /* uv-initialized */
on_tick(&pc->timer); /* Re-enable ticks if required */
}
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
/* Ticks and checking buffers can generate events, process before proceeding */
if (pn_connection_driver_has_event(&pc->driver)) {
leader_to_worker(ps);
} else { /* Re-watch for IO */
if (wbuf.size > 0 && !pc->writing) {
pc->writing = wbuf.size;
uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
pc->write.data = ps;
uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
} else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
pc->shutdown.data = ps;
uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
}
if (rbuf.size > 0 && !pc->reading) {
pc->reading = true;
uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
}
}
}
static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
pn_collector_put(p->collector, pn_proactor__class(), p, t);
p->batch_working = true;
return &p->batch;
}
/* Return the next event batch or 0 if no events are ready */
static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
if (!p->batch_working) { /* Can generate proactor events */
if (p->inactive) {
p->inactive = false;
return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
}
if (p->interrupt > 0) {
--p->interrupt;
return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
}
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
if (ps->is_conn) {
pconnection_t *pc = as_pconnection_t(ps);
return &pc->driver.batch;
} else { /* Listener */
pn_listener_t *l = as_listener(ps);
return &l->batch;
}
to_leader(ps); /* No event, back to leader */
}
return 0;
}
/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
ps->wakeup = action;
to_leader_lh(ps);
uv_mutex_unlock(&ps->proactor->lock);
}
pn_listener_t *pn_event_listener(pn_event_t *e) {
return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
}
pn_proactor_t *pn_event_proactor(pn_event_t *e) {
if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
pn_listener_t *l = pn_event_listener(e);
if (l) return l->psocket.proactor;
pn_connection_t *c = pn_event_connection(e);
if (c) return pn_connection_proactor(pn_event_connection(e));
return NULL;
}
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
pconnection_t *pc = batch_pconnection(batch);
if (pc) {
if (pn_connection_driver_has_event(&pc->driver)) {
/* Process all events before going back to IO. */
worker_requeue(&pc->psocket);
} else if (pn_connection_driver_finished(&pc->driver)) {
owner_to_leader(&pc->psocket, leader_close);
} else {
owner_to_leader(&pc->psocket, leader_rewatch);
}
return;
}
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
uv_mutex_lock(&p->lock);
p->batch_working = false;
uv_async_send(&p->async); /* Wake leader */
uv_mutex_unlock(&p->lock);
return;
}
/* Nothing extra to do for listener, it is always in the UV loop. */
}
/* Run follower/leader loop till we can return an event and be a worker */
pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
/* Try to grab work immediately. */
pn_event_batch_t *batch = get_batch_lh(p);
if (batch == NULL) {
/* No work available, follow the leader */
while (p->has_leader) {
uv_cond_wait(&p->cond, &p->lock);
}
/* Lead till there is work to do. */
p->has_leader = true;
while (batch == NULL) {
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
void (*action)(psocket_t*) = ps->action;
void (*wakeup)(psocket_t*) = ps->wakeup;
ps->action = NULL;
ps->wakeup = NULL;
if (action || wakeup) {
uv_mutex_unlock(&p->lock);
if (action) action(ps);
if (wakeup) wakeup(ps);
uv_mutex_lock(&p->lock);
}
}
batch = get_batch_lh(p);
if (batch == NULL) {
uv_mutex_unlock(&p->lock);
uv_run(&p->loop, UV_RUN_ONCE);
uv_mutex_lock(&p->lock);
}
}
/* Signal the next leader and return to work */
p->has_leader = false;
uv_cond_signal(&p->cond);
}
uv_mutex_unlock(&p->lock);
return batch;
}
void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
++p->interrupt;
uv_async_send(&p->async); /* Interrupt the UV loop */
uv_mutex_unlock(&p->lock);
}
int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
/* Process PN_CONNECTION_INIT before binding */
owner_to_worker(&pc->psocket, leader_connect);
return 0;
}
pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_listener_t, l); }
pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
pn_listener_t *l = new_listener(p, host, port, backlog, extra);
if (l) owner_to_leader(&l->psocket, leader_listen);
return l;
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
pconnection_t *pc = get_pconnection_t(c);
return pc ? pc->psocket.proactor : NULL;
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
return l ? l->psocket.proactor : NULL;
}
void leader_wake_connection(psocket_t *ps) {
pconnection_t *pc = as_pconnection_t(ps);
pn_connection_t *c = pc->driver.connection;
pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
leader_to_worker(ps);
}
void pn_connection_wake(pn_connection_t* c) {
wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
}
void pn_listener_close(pn_listener_t* l) {
wakeup(&l->psocket, leader_close);
}
/* Only called when condition is closed by error. */
pn_condition_t* pn_listener_condition(pn_listener_t* l) {
return l->condition;
}
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
p->collector = pn_collector();
p->batch.next_event = &proactor_batch_next;
if (!p->collector) return NULL;
uv_loop_init(&p->loop);
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
return p;
}
static void on_stopping(uv_handle_t* h, void* v) {
uv_close(h, NULL); /* Close this handle */
if (!uv_loop_alive(h->loop)) /* Everything closed */
uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
}
void pn_proactor_free(pn_proactor_t *p) {
uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */
uv_loop_close(&p->loop);
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
pn_collector_free(p->collector);
free(p);
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
pn_event_t *handled = pn_collector_prev(l->collector);
if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
}
return pn_collector_next(l->collector);
}
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
return pn_collector_next(batch_proactor(batch)->collector);
}