PROTON-1344: proactor batch events, rename connection_driver
renamed pn_connection_engine as pn_connection_driver.
pn_proactor_wait() returns pn_event_batch_t* rather than individual pn_event_t*
to reduce thread-context switching.
Added pn_collector_next() for simpler event looping.
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index e11a8bd..66381fc 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -17,7 +17,7 @@
* under the License.
*/
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/proactor.h>
#include <proton/engine.h>
#include <proton/sasl.h>
@@ -220,9 +220,11 @@
const char *container_id; /* AMQP container-id */
size_t threads;
pn_millis_t heartbeat;
+ bool finished;
} broker_t;
void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
+ memset(b, 0, sizeof(*b));
b->proactor = pn_proactor();
b->listener = NULL;
queues_init(&b->queues);
@@ -293,8 +295,7 @@
const int WINDOW=10; /* Incoming credit window */
-static bool handle(broker_t* b, pn_event_t* e) {
- bool more = true;
+static void handle(broker_t* b, pn_event_t* e) {
pn_connection_t *c = pn_event_connection(e);
switch (pn_event_type(e)) {
@@ -398,20 +399,24 @@
break;
case PN_PROACTOR_INTERRUPT:
- more = false;
+ b->finished = true;
break;
default:
break;
}
- pn_event_done(e);
- return more;
}
static void broker_thread(void *void_broker) {
broker_t *b = (broker_t*)void_broker;
- while (handle(b, pn_proactor_wait(b->proactor)))
- ;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(b, e);
+ }
+ pn_proactor_done(b->proactor, events);
+ } while(!b->finished);
}
static void usage(const char *arg0) {
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index 8dd2706..35afd5c 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -22,7 +22,7 @@
#include <uv.h>
#include <proton/condition.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/engine.h>
#include <proton/extra.h>
#include <proton/message.h>
@@ -44,11 +44,15 @@
To provide concurrency the proactor uses a "leader-worker-follower" model,
threads take turns at the roles:
- - a single "leader" calls libuv functions and runs the uv_loop incrementally.
- When there is work it hands over leadership and becomes a "worker"
+ - a single "leader" calls libuv functions and runs the uv_loop in short bursts
+ to generate work. When there is work available it gives up leadership and
+ becomes a "worker"
+
- "workers" handle events concurrently for distinct connections/listeners
- When the work is done they become "followers"
- - "followers" wait for the leader to step aside, one takes over as new leader.
+ They do as much work as they can get, when none is left they become "followers"
+
+ - "followers" wait for the leader to generate work and become workers.
+ When the leader itself becomes a worker, one of the followers takes over.
This model is symmetric: any thread can take on any role based on run-time
requirements. It also allows the IO and non-IO work associated with an IO
@@ -77,7 +81,7 @@
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-/* common to connection engine and listeners */
+/* common to connection and listener */
typedef struct psocket_t {
/* Immutable */
pn_proactor_t *proactor;
@@ -118,11 +122,11 @@
return str[0] == '\001' ? NULL : str;
}
-typedef struct pconn {
+typedef struct pconnection_t {
psocket_t psocket;
/* Only used by owner thread */
- pn_connection_engine_t ceng;
+ pn_connection_driver_t driver;
/* Only used by leader */
uv_connect_t connect;
@@ -132,7 +136,7 @@
size_t writing;
bool reading:1;
bool server:1; /* accept, not connect */
-} pconn;
+} pconnection_t;
struct pn_listener_t {
psocket_t psocket;
@@ -140,6 +144,7 @@
/* Only used by owner thread */
pn_condition_t *condition;
pn_collector_t *collector;
+ pn_event_batch_t batch;
size_t backlog;
};
@@ -153,6 +158,10 @@
uv_loop_t loop;
uv_async_t async;
+ /* Owner thread: proactor collector and batch can belong to leader or a worker */
+ pn_collector_t *collector;
+ pn_event_batch_t batch;
+
/* Protected by lock */
uv_mutex_t lock;
queue start_q;
@@ -162,11 +171,7 @@
size_t count; /* psocket count */
bool inactive:1;
bool has_leader:1;
-
- /* Immutable collectors to hold fixed events */
- pn_collector_t *interrupt_event;
- pn_collector_t *timeout_event;
- pn_collector_t *inactive_event;
+ bool batch_working:1; /* batch belongs to a worker. */
};
static bool push_lh(queue *q, psocket_t *ps) {
@@ -191,8 +196,8 @@
return ps;
}
-static inline pconn *as_pconn(psocket_t* ps) {
- return ps->is_conn ? (pconn*)ps : NULL;
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+ return ps->is_conn ? (pconnection_t*)ps : NULL;
}
static inline pn_listener_t *as_listener(psocket_t* ps) {
@@ -213,9 +218,9 @@
/* Detach from IO and put ps on the worker queue */
static void leader_to_worker(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
/* Don't detach if there are no events yet. */
- if (pc && pn_connection_engine_has_event(&pc->ceng)) {
+ if (pc && pn_connection_driver_has_event(&pc->driver)) {
if (pc->writing) {
pc->writing = 0;
uv_cancel((uv_req_t*)&pc->write);
@@ -236,6 +241,28 @@
uv_mutex_unlock(&ps->proactor->lock);
}
+/* Set a deferred action for leader, if not already set. */
+static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (!ps->action) {
+ ps->action = action;
+ }
+ to_leader_lh(ps);
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Owner thread send to worker thread. Set deferred action if not already set. */
+static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (!ps->action) {
+ ps->action = action;
+ }
+ push_lh(&ps->proactor->worker_q, ps);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+
/* Re-queue for further work */
static void worker_requeue(psocket_t* ps) {
uv_mutex_lock(&ps->proactor->lock);
@@ -244,25 +271,43 @@
uv_mutex_unlock(&ps->proactor->lock);
}
-static pconn *new_pconn(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
- pconn *pc = (pconn*)calloc(1, sizeof(*pc));
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+ pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
if (!pc) return NULL;
- if (pn_connection_engine_init(&pc->ceng, pn_connection_with_extra(extra.size), NULL) != 0) {
+ if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
return NULL;
}
if (extra.start && extra.size) {
- memcpy(pn_connection_get_extra(pc->ceng.connection).start, extra.start, extra.size);
+ memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
}
psocket_init(&pc->psocket, p, true, host, port);
if (server) {
- pn_transport_set_server(pc->ceng.transport);
+ pn_transport_set_server(pc->driver.transport);
}
- pn_record_t *r = pn_connection_attachments(pc->ceng.connection);
+ pn_record_t *r = pn_connection_attachments(pc->driver.connection);
pn_record_def(r, PN_PROACTOR, PN_VOID);
pn_record_set(r, PN_PROACTOR, pc);
return pc;
}
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+ return (batch->next_event == proactor_batch_next) ?
+ (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+ return (batch->next_event == listener_batch_next) ?
+ (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+ pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
+ return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
+}
+
pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
if (!l) {
@@ -278,6 +323,7 @@
}
psocket_init(&l->psocket, p, false, host, port);
l->condition = pn_condition();
+ l->batch.next_event = listener_batch_next;
l->backlog = backlog;
return l;
}
@@ -290,11 +336,12 @@
}
/* Free if there are no uv callbacks pending and no events */
-static void leader_pconn_maybe_free(pconn *pc) {
- if (pn_connection_engine_has_event(&pc->ceng)) {
+static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
leader_to_worker(&pc->psocket); /* Return to worker */
- } else if (!(pc->psocket.tcp.data || pc->shutdown.data || pc->timer.data)) {
- pn_connection_engine_destroy(&pc->ceng);
+ } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
+ /* All UV requests are finished */
+ pn_connection_driver_destroy(&pc->driver);
leader_count(pc->psocket.proactor, -1);
free(pc);
}
@@ -314,7 +361,7 @@
/* Free if there are no uv callbacks pending and no events */
static void leader_maybe_free(psocket_t *ps) {
if (ps->is_conn) {
- leader_pconn_maybe_free(as_pconn(ps));
+ leader_pconnection_t_maybe_free(as_pconnection_t(ps));
} else {
leader_listener_maybe_free(as_listener(ps));
}
@@ -336,9 +383,9 @@
if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
uv_close((uv_handle_t*)&ps->tcp, on_close);
}
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
- pn_connection_engine_close(&pc->ceng);
+ pn_connection_driver_close(&pc->driver);
if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
uv_timer_stop(&pc->timer);
uv_close((uv_handle_t*)&pc->timer, on_close);
@@ -347,20 +394,20 @@
leader_maybe_free(ps);
}
-static pconn *get_pconn(pn_connection_t* c) {
+static pconnection_t *get_pconnection_t(pn_connection_t* c) {
if (!c) return NULL;
pn_record_t *r = pn_connection_attachments(c);
- return (pconn*) pn_record_get(r, PN_PROACTOR);
+ return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
static void leader_error(psocket_t *ps, int err, const char* what) {
if (ps->is_conn) {
- pn_connection_engine_t *ceng = &as_pconn(ps)->ceng;
- pn_connection_engine_errorf(ceng, COND_NAME, "%s %s:%s: %s",
+ pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
what, fixstr(ps->host), fixstr(ps->port),
uv_strerror(err));
- pn_connection_engine_bind(ceng);
- pn_connection_engine_close(ceng);
+ pn_connection_driver_close(driver);
} else {
pn_listener_t *l = as_listener(ps);
pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
@@ -376,9 +423,9 @@
leader_count(ps->proactor, +1);
int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
if (!err) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
- pc->connect.data = pc->write.data = pc->shutdown.data = ps;
+ pc->connect.data = ps;
int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
if (!err) {
pc->timer.data = pc;
@@ -392,7 +439,7 @@
}
/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconn *pc, int err, const char *what) {
+static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
if (!err) {
leader_to_worker(&pc->psocket);
} else {
@@ -401,14 +448,14 @@
}
static void on_connect(uv_connect_t *connect, int err) {
- leader_connect_accept((pconn*)connect->data, err, "on connect");
+ leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
}
static void on_accept(uv_stream_t* server, int err) {
pn_listener_t* l = (pn_listener_t*)server->data;
if (!err) {
pn_rwbytes_t v = pn_listener_get_extra(l);
- pconn *pc = new_pconn(l->psocket.proactor, true,
+ pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
fixstr(l->psocket.host),
fixstr(l->psocket.port),
pn_bytes(v.size, v.start));
@@ -436,7 +483,7 @@
}
static void leader_connect(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, false);
if (!err) {
@@ -450,7 +497,7 @@
static void leader_listen(psocket_t *ps) {
pn_listener_t *l = as_listener(ps);
- uv_getaddrinfo_t info;
+ uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, true);
if (!err) {
err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
@@ -463,8 +510,8 @@
}
static void on_tick(uv_timer_t *timer) {
- pconn *pc = (pconn*)timer->data;
- pn_transport_t *t = pc->ceng.transport;
+ pconnection_t *pc = (pconnection_t*)timer->data;
+ pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
uv_timer_stop(&pc->timer);
uint64_t now = uv_now(pc->timer.loop);
@@ -476,24 +523,25 @@
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
- pconn *pc = (pconn*)stream->data;
+ pconnection_t *pc = (pconnection_t*)stream->data;
if (nread >= 0) {
- pn_connection_engine_read_done(&pc->ceng, nread);
+ pn_connection_driver_read_done(&pc->driver, nread);
on_tick(&pc->timer); /* check for tick changes. */
leader_to_worker(&pc->psocket);
/* Reading continues automatically until stopped. */
} else if (nread == UV_EOF) { /* hangup */
- pn_connection_engine_read_close(&pc->ceng);
+ pn_connection_driver_read_close(&pc->driver);
leader_maybe_free(&pc->psocket);
} else {
leader_error(&pc->psocket, nread, "on read from");
}
}
-static void on_write(uv_write_t* request, int err) {
- pconn *pc = (pconn*)request->data;
+static void on_write(uv_write_t* write, int err) {
+ pconnection_t *pc = (pconnection_t*)write->data;
+ write->data = NULL;
if (err == 0) {
- pn_connection_engine_write_done(&pc->ceng, pc->writing);
+ pn_connection_driver_write_done(&pc->driver, pc->writing);
leader_to_worker(&pc->psocket);
} else if (err == UV_ECANCELED) {
leader_maybe_free(&pc->psocket);
@@ -505,29 +553,31 @@
// Read buffer allocation function for uv, just returns the transports read buffer.
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
- pconn *pc = (pconn*)stream->data;
- pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
+ pconnection_t *pc = (pconnection_t*)stream->data;
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
static void leader_rewatch(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
if (pc->timer.data) { /* uv-initialized */
on_tick(&pc->timer); /* Re-enable ticks if required */
}
- pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
- pn_bytes_t wbuf = pn_connection_engine_write_buffer(&pc->ceng);
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
/* Ticks and checking buffers can generate events, process before proceeding */
- if (pn_connection_engine_has_event(&pc->ceng)) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
leader_to_worker(ps);
} else { /* Re-watch for IO */
if (wbuf.size > 0 && !pc->writing) {
pc->writing = wbuf.size;
uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ pc->write.data = ps;
uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_engine_write_closed(&pc->ceng)) {
+ } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+ pc->shutdown.data = ps;
uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
}
if (rbuf.size > 0 && !pc->reading) {
@@ -537,23 +587,31 @@
}
}
-/* Return the next worker event or { 0 } if no events are ready */
-static pn_event_t* get_event_lh(pn_proactor_t *p) {
- if (p->inactive) {
- p->inactive = false;
- return pn_collector_peek(p->inactive_event);
- }
- if (p->interrupt > 0) {
- --p->interrupt;
- return pn_collector_peek(p->interrupt_event);
+static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
+ pn_collector_put(p->collector, pn_proactor__class(), p, t);
+ p->batch_working = true;
+ return &p->batch;
+}
+
+/* Return the next event batch or 0 if no events are ready */
+static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+ if (!p->batch_working) { /* Can generate proactor events */
+ if (p->inactive) {
+ p->inactive = false;
+ return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
+ }
+ if (p->interrupt > 0) {
+ --p->interrupt;
+ return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
+ }
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
if (ps->is_conn) {
- pconn *pc = as_pconn(ps);
- return pn_connection_engine_event(&pc->ceng);
+ pconnection_t *pc = as_pconnection_t(ps);
+ return &pc->driver.batch;
} else { /* Listener */
pn_listener_t *l = as_listener(ps);
- return pn_collector_peek(l->collector);
+ return &l->batch;
}
to_leader(ps); /* No event, back to leader */
}
@@ -568,15 +626,6 @@
uv_mutex_unlock(&ps->proactor->lock);
}
-/* Defer an action to the leader thread. Only from non-leader threads. */
-static void owner_defer(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- assert(!ps->action);
- ps->action = action;
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
pn_listener_t *pn_event_listener(pn_event_t *e) {
return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
}
@@ -590,57 +639,47 @@
return NULL;
}
-void pn_event_done(pn_event_t *e) {
- pn_event_type_t etype = pn_event_type(e);
- pconn *pc = get_pconn(pn_event_connection(e));
- if (pc && e == pn_collector_peek(pc->ceng.collector)) {
- pn_connection_engine_pop_event(&pc->ceng);
- if (etype == PN_CONNECTION_INIT) {
- /* Bind after user has handled CONNECTION_INIT */
- pn_connection_engine_bind(&pc->ceng);
- }
- if (pn_connection_engine_has_event(&pc->ceng)) {
- /* Process all events before going back to IO.
- Put it back on the worker queue and wake the leader.
- */
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ /* Process all events before going back to IO. */
worker_requeue(&pc->psocket);
- } else if (pn_connection_engine_finished(&pc->ceng)) {
- owner_defer(&pc->psocket, leader_close);
+ } else if (pn_connection_driver_finished(&pc->driver)) {
+ owner_to_leader(&pc->psocket, leader_close);
} else {
- owner_defer(&pc->psocket, leader_rewatch);
+ owner_to_leader(&pc->psocket, leader_rewatch);
}
- } else {
- pn_listener_t *l = pn_event_listener(e);
- if (l && e == pn_collector_peek(l->collector)) {
- pn_collector_pop(l->collector);
- if (etype == PN_LISTENER_CLOSE) {
- owner_defer(&l->psocket, leader_close);
- }
- }
+ return;
}
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ uv_mutex_lock(&p->lock);
+ p->batch_working = false;
+ uv_async_send(&p->async); /* Wake leader */
+ uv_mutex_unlock(&p->lock);
+ return;
+ }
+ /* Nothing extra to do for listener, it is always in the UV loop. */
}
/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
/* Try to grab work immediately. */
- pn_event_t *e = get_event_lh(p);
- if (e == NULL) {
+ pn_event_batch_t *batch = get_batch_lh(p);
+ if (batch == NULL) {
/* No work available, follow the leader */
- while (p->has_leader)
+ while (p->has_leader) {
uv_cond_wait(&p->cond, &p->lock);
+ }
/* Lead till there is work to do. */
p->has_leader = true;
- for (e = get_event_lh(p); e == NULL; e = get_event_lh(p)) {
- /* Run uv_loop outside the lock */
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_ONCE);
- uv_mutex_lock(&p->lock);
- /* Process leader work queue outside the lock */
+ while (batch == NULL) {
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
void (*action)(psocket_t*) = ps->action;
- ps->action = NULL;
void (*wakeup)(psocket_t*) = ps->wakeup;
+ ps->action = NULL;
ps->wakeup = NULL;
if (action || wakeup) {
uv_mutex_unlock(&p->lock);
@@ -649,13 +688,19 @@
uv_mutex_lock(&p->lock);
}
}
+ batch = get_batch_lh(p);
+ if (batch == NULL) {
+ uv_mutex_unlock(&p->lock);
+ uv_run(&p->loop, UV_RUN_ONCE);
+ uv_mutex_lock(&p->lock);
+ }
}
/* Signal the next leader and return to work */
p->has_leader = false;
uv_cond_signal(&p->cond);
}
uv_mutex_unlock(&p->lock);
- return e;
+ return batch;
}
void pn_proactor_interrupt(pn_proactor_t *p) {
@@ -666,11 +711,12 @@
}
int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
- pconn *pc = new_pconn(p, false, host, port, extra);
+ pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
- owner_defer(&pc->psocket, leader_connect); /* Process PN_CONNECTION_INIT before binding */
+ /* Process PN_CONNECTION_INIT before binding */
+ owner_to_worker(&pc->psocket, leader_connect);
return 0;
}
@@ -678,12 +724,12 @@
pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
pn_listener_t *l = new_listener(p, host, port, backlog, extra);
- if (l) owner_defer(&l->psocket, leader_listen);
+ if (l) owner_to_leader(&l->psocket, leader_listen);
return l;
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconn *pc = get_pconn(c);
+ pconnection_t *pc = get_pconnection_t(c);
return pc ? pc->psocket.proactor : NULL;
}
@@ -692,13 +738,14 @@
}
void leader_wake_connection(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
- pn_collector_put(pc->ceng.collector, PN_OBJECT, pc->ceng.connection, PN_CONNECTION_WAKE);
+ pconnection_t *pc = as_pconnection_t(ps);
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
leader_to_worker(ps);
}
void pn_connection_wake(pn_connection_t* c) {
- wakeup(&get_pconn(c)->psocket, leader_wake_connection);
+ wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
}
void pn_listener_close(pn_listener_t* l) {
@@ -710,22 +757,15 @@
return l->condition;
}
-/* Collector to hold for a single fixed event that is never popped. */
-static pn_collector_t *event_holder(pn_proactor_t *p, pn_event_type_t t) {
- pn_collector_t *c = pn_collector();
- pn_collector_put(c, pn_proactor__class(), p, t);
- return c;
-}
-
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+ p->collector = pn_collector();
+ p->batch.next_event = &proactor_batch_next;
+ if (!p->collector) return NULL;
uv_loop_init(&p->loop);
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
- p->interrupt_event = event_holder(p, PN_PROACTOR_INTERRUPT);
- p->inactive_event = event_holder(p, PN_PROACTOR_INACTIVE);
- p->timeout_event = event_holder(p, PN_PROACTOR_TIMEOUT);
return p;
}
@@ -741,8 +781,19 @@
uv_loop_close(&p->loop);
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
- pn_collector_free(p->interrupt_event);
- pn_collector_free(p->inactive_event);
- pn_collector_free(p->timeout_event);
+ pn_collector_free(p->collector);
free(p);
}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+ pn_listener_t *l = batch_listener(batch);
+ pn_event_t *handled = pn_collector_prev(l->collector);
+ if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
+ owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
+ }
+ return pn_collector_next(l->collector);
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+ return pn_collector_next(batch_proactor(batch)->collector);
+}
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index acdae0c..88e3456 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -20,7 +20,7 @@
*/
#include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/delivery.h>
#include <proton/proactor.h>
#include <proton/link.h>
@@ -37,12 +37,13 @@
typedef char str[1024];
typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
- int message_count;
- int received;
- pn_proactor_t *proactor;
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+ int received;
+ pn_proactor_t *proactor;
+ bool finished;
} app_data_t;
static const int BATCH = 100; /* Batch size for unlimited receive */
@@ -80,9 +81,7 @@
}
}
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
- bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
@@ -149,53 +148,58 @@
break;
case PN_PROACTOR_INACTIVE:
- more = false;
+ app->finished = true;
break;
default: break;
}
- pn_event_done(event);
- return more;
}
static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
- exit(1);
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ exit(1);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
+ /* Default values for application and connection. */
+ app_data_t app = {{0}};
+ app.message_count = 100;
+ const char* urlstr = NULL;
- int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ default: usage(argv[0]); break;
}
- if (optind < argc)
- usage(argv[0]);
+ }
+ if (optind < argc)
+ usage(argv[0]);
- snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+ snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- const char *host = url ? pn_url_get_host(url) : NULL;
- const char *port = url ? pn_url_get_port(url) : "amqp";
- strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : "amqp";
+ strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
- if (url) pn_url_free(url);
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+ if (url) pn_url_free(url);
- while (handle(&app, pn_proactor_wait(app.proactor)))
- ;
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(&app, e);
+ }
+ pn_proactor_done(app.proactor, events);
+ } while(!app.finished);
+
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
}
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 5d58895..d64ea2d 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -20,7 +20,7 @@
*/
#include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/delivery.h>
#include <proton/proactor.h>
#include <proton/link.h>
@@ -37,13 +37,14 @@
typedef char str[1024];
typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
- int message_count;
- int sent;
- int acknowledged;
- pn_proactor_t *proactor;
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+ int sent;
+ int acknowledged;
+ pn_proactor_t *proactor;
+ bool finished;
} app_data_t;
int exit_code = 0;
@@ -58,41 +59,39 @@
/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
static pn_bytes_t encode_message(app_data_t* app) {
- /* Construct a message with the map { "sequence": app.sent } */
- pn_message_t* message = pn_message();
- pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
- pn_data_t* body = pn_message_body(message);
- pn_data_put_map(body);
- pn_data_enter(body);
- pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
- pn_data_put_int(body, app->sent); /* The sequence number */
- pn_data_exit(body);
+ /* Construct a message with the map { "sequence": app.sent } */
+ pn_message_t* message = pn_message();
+ pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+ pn_data_t* body = pn_message_body(message);
+ pn_data_put_map(body);
+ pn_data_enter(body);
+ pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+ pn_data_put_int(body, app->sent); /* The sequence number */
+ pn_data_exit(body);
- /* encode the message, expanding the encode buffer as needed */
- if (app->message_buffer.start == NULL) {
- static const size_t initial_size = 128;
- app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
- }
- /* app->message_buffer is the total buffer space available. */
- /* mbuf wil point at just the portion used by the encoded message */
- pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
- int status = 0;
- while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
- app->message_buffer.size *= 2;
- app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
- mbuf.size = app->message_buffer.size;
- }
- if (status != 0) {
- fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
- exit(1);
- }
- pn_message_free(message);
- return pn_bytes(mbuf.size, mbuf.start);
+ /* encode the message, expanding the encode buffer as needed */
+ if (app->message_buffer.start == NULL) {
+ static const size_t initial_size = 128;
+ app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ }
+ /* app->message_buffer is the total buffer space available. */
+ /* mbuf wil point at just the portion used by the encoded message */
+ pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ int status = 0;
+ while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+ app->message_buffer.size *= 2;
+ app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+ mbuf.size = app->message_buffer.size;
+ }
+ if (status != 0) {
+ fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+ exit(1);
+ }
+ pn_message_free(message);
+ return pn_bytes(mbuf.size, mbuf.start);
}
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
- bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
@@ -130,7 +129,7 @@
}
} break;
- case PN_TRANSPORT_ERROR:
+ case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
break;
@@ -151,53 +150,58 @@
break;
case PN_PROACTOR_INACTIVE:
- more = false;
+ app->finished = true;
break;
default: break;
}
- pn_event_done(event);
- return more;
}
static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
- exit(1);
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ exit(1);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
+ /* Default values for application and connection. */
+ app_data_t app = {{0}};
+ app.message_count = 100;
+ const char* urlstr = NULL;
- int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ default: usage(argv[0]); break;
}
- if (optind < argc)
- usage(argv[0]);
+ }
+ if (optind < argc)
+ usage(argv[0]);
- snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+ snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- const char *host = url ? pn_url_get_host(url) : NULL;
- const char *port = url ? pn_url_get_port(url) : "amqp";
- strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : "amqp";
+ strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
- if (url) pn_url_free(url);
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+ if (url) pn_url_free(url);
- while (handle(&app, pn_proactor_wait(app.proactor)))
- ;
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(&app, e);
+ }
+ pn_proactor_done(app.proactor, events);
+ } while(!app.finished);
+
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
}
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1d46ec8..447d3ad 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -123,7 +123,7 @@
/** @example mt/epoll_container.cpp
An example implementation of the proton::container API that shows how
-to use the proton::io::connection_engine SPI to adapt the proton API
+to use the proton::io::connection_driver SPI to adapt the proton API
to native IO, in this case using a multithreaded Linux epoll poller as
the implementation.
diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp
index d9b9f08..7646673 100644
--- a/examples/cpp/mt/epoll_container.cpp
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -25,7 +25,7 @@
#include <proton/url.hpp>
#include <proton/io/container_impl_base.hpp>
-#include <proton/io/connection_engine.hpp>
+#include <proton/io/connection_driver.hpp>
#include <proton/io/link_namer.hpp>
#include <atomic>
@@ -97,7 +97,7 @@
};
class pollable;
-class pollable_engine;
+class pollable_driver;
class pollable_listener;
class epoll_container : public proton::io::container_impl_base {
@@ -124,7 +124,7 @@
std::string id() const OVERRIDE { return id_; }
// Functions used internally.
- proton::connection add_engine(proton::connection_options opts, int fd, bool server);
+ proton::connection add_driver(proton::connection_options opts, int fd, bool server);
void erase(pollable*);
// Link names must be unique per container.
@@ -160,7 +160,7 @@
proton::connection_options options_;
std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
- std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+ std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_;
std::condition_variable stopped_;
bool stopping_;
@@ -274,21 +274,21 @@
bool closed_;
};
-// Handle epoll wakeups for a connection_engine.
-class pollable_engine : public pollable {
+// Handle epoll wakeups for a connection_driver.
+class pollable_driver : public pollable {
public:
- pollable_engine(epoll_container& c, int fd, int epoll_fd) :
+ pollable_driver(epoll_container& c, int fd, int epoll_fd) :
pollable(fd, epoll_fd),
loop_(new epoll_event_loop(*this)),
- engine_(c, loop_)
+ driver_(c, loop_)
{
- proton::connection conn = engine_.connection();
+ proton::connection conn = driver_.connection();
proton::io::set_link_namer(conn, c.link_namer);
}
- ~pollable_engine() {
+ ~pollable_driver() {
loop_->close(); // No calls to notify() after this.
- engine_.dispatch(); // Run any final events.
+ driver_.dispatch(); // Run any final events.
try { write(); } catch(...) {} // Write connection close if we can.
for (auto f : loop_->pop_all()) {// Run final queued work for side-effects.
try { f(); } catch(...) {}
@@ -303,17 +303,17 @@
can_read = can_read && read();
for (auto f : loop_->pop_all()) // Run queued work
f();
- engine_.dispatch();
+ driver_.dispatch();
} while (can_read || can_write);
- return (engine_.read_buffer().size ? EPOLLIN:0) |
- (engine_.write_buffer().size ? EPOLLOUT:0);
+ return (driver_.read_buffer().size ? EPOLLIN:0) |
+ (driver_.write_buffer().size ? EPOLLOUT:0);
} catch (const std::exception& e) {
- engine_.disconnected(proton::error_condition("exception", e.what()));
+ driver_.disconnected(proton::error_condition("exception", e.what()));
}
return 0; // Ending
}
- proton::io::connection_engine& engine() { return engine_; }
+ proton::io::connection_driver& driver() { return driver_; }
private:
static bool try_again(int e) {
@@ -322,11 +322,11 @@
}
bool write() {
- proton::io::const_buffer wbuf(engine_.write_buffer());
+ proton::io::const_buffer wbuf(driver_.write_buffer());
if (wbuf.size) {
ssize_t n = ::write(fd_, wbuf.data, wbuf.size);
if (n > 0) {
- engine_.write_done(n);
+ driver_.write_done(n);
return true;
} else if (n < 0 && !try_again(errno)) {
check(n, "write");
@@ -336,15 +336,15 @@
}
bool read() {
- proton::io::mutable_buffer rbuf(engine_.read_buffer());
+ proton::io::mutable_buffer rbuf(driver_.read_buffer());
if (rbuf.size) {
ssize_t n = ::read(fd_, rbuf.data, rbuf.size);
if (n > 0) {
- engine_.read_done(n);
+ driver_.read_done(n);
return true;
}
else if (n == 0)
- engine_.read_close();
+ driver_.read_close();
else if (!try_again(errno))
check(n, "read");
}
@@ -352,13 +352,13 @@
}
// Lifecycle note: loop_ belongs to the proton::connection, which can live
- // longer than the engine if the application holds a reference to it, we
- // disconnect ourselves with loop_->close() in ~connection_engine()
+ // longer than the driver if the application holds a reference to it, we
+ // disconnect ourselves with loop_->close() in ~connection_driver()
epoll_event_loop* loop_;
- proton::io::connection_engine engine_;
+ proton::io::connection_driver driver_;
};
-// A pollable listener fd that creates pollable_engine for incoming connections.
+// A pollable listener fd that creates pollable_driver for incoming connections.
class pollable_listener : public pollable {
public:
pollable_listener(
@@ -380,7 +380,7 @@
}
try {
int accepted = check(::accept(fd_, NULL, 0), "accept");
- container_.add_engine(listener_.on_accept(), accepted, true);
+ container_.add_driver(listener_.on_accept(), accepted, true);
return EPOLLIN;
} catch (const std::exception& e) {
listener_.on_error(e.what());
@@ -424,25 +424,25 @@
} catch (...) {}
}
-proton::connection epoll_container::add_engine(proton::connection_options opts, int fd, bool server)
+proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server)
{
lock_guard g(lock_);
if (stopping_)
throw proton::error("container is stopping");
- std::unique_ptr<pollable_engine> eng(new pollable_engine(*this, fd, epoll_fd_));
+ std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_));
if (server)
- eng->engine().accept(opts);
+ eng->driver().accept(opts);
else
- eng->engine().connect(opts);
- proton::connection c = eng->engine().connection();
+ eng->driver().connect(opts);
+ proton::connection c = eng->driver().connection();
eng->notify();
- engines_[eng.get()] = std::move(eng);
+ drivers_[eng.get()] = std::move(eng);
return c;
}
void epoll_container::erase(pollable* e) {
lock_guard g(lock_);
- if (!engines_.erase(e)) {
+ if (!drivers_.erase(e)) {
pollable_listener* l = dynamic_cast<pollable_listener*>(e);
if (l)
listeners_.erase(l->addr());
@@ -451,7 +451,7 @@
}
void epoll_container::idle_check(const lock_guard&) {
- if (stopping_ && engines_.empty() && listeners_.empty())
+ if (stopping_ && drivers_.empty() && listeners_.empty())
interrupt();
}
@@ -462,7 +462,7 @@
unique_addrinfo ainfo(addr);
unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
- return make_thread_safe(add_engine(opts, fd.release(), false));
+ return make_thread_safe(add_driver(opts, fd.release(), false));
}
proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) {
@@ -520,10 +520,10 @@
void epoll_container::wait() {
std::unique_lock<std::mutex> l(lock_);
stopped_.wait(l, [this]() { return this->threads_ == 0; } );
- for (auto& eng : engines_)
- eng.second->engine().disconnected(stop_err_);
+ for (auto& eng : drivers_)
+ eng.second->driver().disconnected(stop_err_);
listeners_.clear();
- engines_.clear();
+ drivers_.clear();
}
void epoll_container::interrupt() {
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index ddab147..ffc6e10 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -369,7 +369,7 @@
src/core/encoder.c
src/core/dispatcher.c
- src/core/connection_engine.c
+ src/core/connection_driver.c
src/core/engine.c
src/core/event.c
src/core/autodetect.c
@@ -440,7 +440,7 @@
include/proton/codec.h
include/proton/condition.h
include/proton/connection.h
- include/proton/connection_engine.h
+ include/proton/connection_driver.h
include/proton/delivery.h
include/proton/disposition.h
include/proton/engine.h
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index ed969eb..6af4319 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -48,7 +48,7 @@
src/error_condition.cpp
src/event_loop.cpp
src/handler.cpp
- src/io/connection_engine.cpp
+ src/io/connection_driver.cpp
src/io/link_namer.cpp
src/link.cpp
src/listener.cpp
diff --git a/proton-c/bindings/cpp/docs/io.md b/proton-c/bindings/cpp/docs/io.md
index a892e61..230e538 100644
--- a/proton-c/bindings/cpp/docs/io.md
+++ b/proton-c/bindings/cpp/docs/io.md
@@ -7,16 +7,16 @@
that allows you to implement the Proton API over alternative IO or
threading libraries.
-The `proton::io::connection_engine` class converts an AMQP-encoded
+The `proton::io::connection_driver` class converts an AMQP-encoded
byte stream, read from any IO source, into `proton::messaging_handler`
calls. It generates an AMQP-encoded byte stream as output that can be
written to any IO destination.
-The connection engine is deliberately very simple and low level. It
+The connection driver is deliberately very simple and low level. It
performs no IO of its own, no thread-related locking, and is written
in simple C++98-compatible code.
-The connection engine can be used standalone as an AMQP translator, or
+The connection dirver can be used standalone as an AMQP translator, or
you can implement the following two interfaces to provide a complete
implementation of the Proton API that can run any Proton application:
diff --git a/proton-c/bindings/cpp/docs/main.md b/proton-c/bindings/cpp/docs/main.md
index 011df29..93ba2c0 100644
--- a/proton-c/bindings/cpp/docs/main.md
+++ b/proton-c/bindings/cpp/docs/main.md
@@ -123,6 +123,6 @@
`proton::default_container`.
You can implement your own container to integrate proton with any IO
-provider using the `proton::io::connection_engine`.
+provider using the `proton::io::connection_driver`.
@see @ref io_page
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 9fbdbdc..d2deebf 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,7 +40,7 @@
class connection;
namespace io {
-class connection_engine;
+class connection_driver;
}
/// Options for creating a connection.
@@ -163,7 +163,7 @@
/// @cond INTERNAL
friend class container_impl;
friend class connector;
- friend class io::connection_engine;
+ friend class io::connection_driver;
friend class connection;
/// @endcond
};
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
similarity index 87%
rename from proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
rename to proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index d9825c2..d5da718 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_IO_CONNECTION_ENGINE_HPP
-#define PROTON_IO_CONNECTION_ENGINE_HPP
+#ifndef PROTON_IO_CONNECTION_DRIVER_HPP
+#define PROTON_IO_CONNECTION_DRIVER_HPP
/*
*
@@ -32,7 +32,7 @@
#include "../transport.hpp"
#include "../types.hpp"
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <cstddef>
#include <utility>
@@ -63,13 +63,9 @@
const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
};
-/// **Experimental** - An AMQP protocol engine for a single
-/// connection.
+/// **Experimental** - An AMQP driver for a single connection.
///
-/// A connection_engine is a protocol engine that integrates AMQP into
-/// any IO or concurrency framework.
-///
-/// io::connection_engine manages a single proton::connection and dispatches
+/// io::connection_driver manages a single proton::connection and dispatches
/// events to a proton::messaging_handler. It does no IO of its own, but allows you to
/// integrate AMQP protocol handling into any IO or concurrency framework.
///
@@ -78,7 +74,7 @@
/// proton::messaging_handler to respond to transport, connection,
/// session, link, and message events. With a little care, the same
/// handler classes can be used for both container and
-/// connection_engine. the @ref broker.cpp example illustrates this.
+/// connection_driver. the @ref broker.cpp example illustrates this.
///
/// You need to write the IO code to read AMQP data to the
/// read_buffer(). The engine parses the AMQP frames. dispatch() calls
@@ -96,7 +92,7 @@
///
/// The engine never throws exceptions.
class
-PN_CPP_CLASS_EXTERN connection_engine {
+PN_CPP_CLASS_EXTERN connection_driver {
public:
/// An engine that is not associated with a proton::container or
/// proton::event_loop.
@@ -104,20 +100,20 @@
/// Accessing the container or event_loop for this connection in
/// a proton::messaging_handler will throw a proton::error exception.
///
- PN_CPP_EXTERN connection_engine();
+ PN_CPP_EXTERN connection_driver();
- /// Create a connection engine associated with a proton::container and
+ /// Create a connection driver associated with a proton::container and
/// optional event_loop. If the event_loop is not provided attempts to use
/// it will throw proton::error.
///
/// Takes ownership of the event_loop. Note the proton::connection created
- /// by this connection_engine can outlive the connection_engine itself if
+ /// by this connection_driver can outlive the connection_driver itself if
/// the user pins it in memory using the proton::thread_safe<> template.
/// The event_loop is deleted when, and only when, the proton::connection is.
///
- PN_CPP_EXTERN connection_engine(proton::container&, event_loop* loop = 0);
+ PN_CPP_EXTERN connection_driver(proton::container&, event_loop* loop = 0);
- PN_CPP_EXTERN ~connection_engine();
+ PN_CPP_EXTERN ~connection_driver();
/// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
/// Does not apply any default options, to apply container defaults use connect() or accept()
@@ -189,27 +185,27 @@
///
PN_CPP_EXTERN bool dispatch();
- /// Get the AMQP connection associated with this connection_engine.
+ /// Get the AMQP connection associated with this connection_driver.
/// The event_loop is availabe via proton::thread_safe<connection>(connection())
PN_CPP_EXTERN proton::connection connection() const;
- /// Get the transport associated with this connection_engine.
+ /// Get the transport associated with this connection_driver.
PN_CPP_EXTERN proton::transport transport() const;
- /// Get the container associated with this connection_engine, if there is one.
+ /// Get the container associated with this connection_driver, if there is one.
PN_CPP_EXTERN proton::container* container() const;
private:
void init();
- connection_engine(const connection_engine&);
- connection_engine& operator=(const connection_engine&);
+ connection_driver(const connection_driver&);
+ connection_driver& operator=(const connection_driver&);
messaging_handler* handler_;
proton::container* container_;
- pn_connection_engine_t engine_;
+ pn_connection_driver_t driver_;
};
} // io
} // proton
-#endif // PROTON_IO_CONNECTION_ENGINE_HPP
+#endif // PROTON_IO_CONNECTION_DRIVER_HPP
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index 2c5423f..acdcd30 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -40,7 +40,7 @@
class messaging_adapter;
namespace io {
-class connection_engine;
+class connection_driver;
}
/// A handler for Proton messaging events.
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index bcd8a2f..10641e0 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -35,7 +35,7 @@
class sasl;
namespace io {
-class connection_engine;
+class connection_driver;
}
/// A network channel supporting an AMQP connection.
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 6c3341f..991836d 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -24,7 +24,7 @@
#include "proton/container.hpp"
#include "proton/uuid.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
#include "proton/io/link_namer.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/types_fwd.hpp"
@@ -37,7 +37,7 @@
using namespace std;
using namespace proton;
-using proton::io::connection_engine;
+using proton::io::connection_driver;
using proton::io::const_buffer;
using proton::io::mutable_buffer;
@@ -45,14 +45,14 @@
typedef std::deque<char> byte_stream;
-/// In memory connection_engine that reads and writes from byte_streams
-struct in_memory_engine : public connection_engine {
+/// In memory connection_driver that reads and writes from byte_streams
+struct in_memory_engine : public connection_driver {
byte_stream& reads;
byte_stream& writes;
in_memory_engine(byte_stream& rd, byte_stream& wr, class container& cont) :
- connection_engine(cont), reads(rd), writes(wr) {}
+ connection_driver(cont), reads(rd), writes(wr) {}
void do_read() {
mutable_buffer rbuf = read_buffer();
@@ -247,7 +247,7 @@
void test_no_container() {
// An engine with no container should throw, not crash.
- connection_engine e;
+ connection_driver e;
try {
e.connection().container();
FAIL("expected error");
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 74a763c..1d4194e 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -24,7 +24,7 @@
#include "proton/connection.hpp"
#include "proton/container.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
#include "proton/event_loop.hpp"
#include "proton/listen_handler.hpp"
#include "proton/message.hpp"
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp
new file mode 100644
index 0000000..06b01d8
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_driver.hpp"
+
+#include "proton/event_loop.hpp"
+#include "proton/error.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "messaging_adapter.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_event.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+
+namespace proton {
+namespace io {
+
+void connection_driver::init() {
+ if (pn_connection_driver_init(&driver_, pn_connection(), pn_transport()) != 0) {
+ this->~connection_driver(); // Dtor won't be called on throw from ctor.
+ throw proton::error(std::string("connection_driver allocation failed"));
+ }
+}
+
+connection_driver::connection_driver() : handler_(0), container_(0) { init(); }
+
+connection_driver::connection_driver(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
+ init();
+ connection_context& ctx = connection_context::get(connection());
+ ctx.container = container_;
+ ctx.event_loop.reset(loop);
+}
+
+connection_driver::~connection_driver() {
+ pn_connection_driver_destroy(&driver_);
+}
+
+// FIXME aconway 2016-11-16: rename _engine > _driver
+void connection_driver::configure(const connection_options& opts, bool server) {
+ proton::connection c(connection());
+ opts.apply_unbound(c);
+ if (server) pn_transport_set_server(driver_.transport);
+ pn_connection_driver_bind(&driver_);
+ opts.apply_bound(c);
+ handler_ = opts.handler();
+ connection_context::get(connection()).collector =
+ pn_connection_collector(driver_.connection);
+}
+
+void connection_driver::connect(const connection_options& opts) {
+ connection_options all;
+ if (container_) {
+ all.container_id(container_->id());
+ all.update(container_->client_connection_options());
+ }
+ all.update(opts);
+ configure(all, false);
+ connection().open();
+}
+
+void connection_driver::accept(const connection_options& opts) {
+ connection_options all;
+ if (container_) {
+ all.container_id(container_->id());
+ all.update(container_->server_connection_options());
+ }
+ all.update(opts);
+ configure(all, true);
+}
+
+bool connection_driver::dispatch() {
+ pn_event_t* c_event;
+ while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
+ proton_event cpp_event(c_event, container_);
+ try {
+ if (handler_ != 0) {
+ messaging_adapter adapter(*handler_);
+ cpp_event.dispatch(adapter);
+ }
+ } catch (const std::exception& e) {
+ pn_condition_t *cond = pn_transport_condition(driver_.transport);
+ if (!pn_condition_is_set(cond)) {
+ pn_condition_format(cond, "exception", "%s", e.what());
+ }
+ }
+ }
+ return !pn_connection_driver_finished(&driver_);
+}
+
+mutable_buffer connection_driver::read_buffer() {
+ pn_rwbytes_t buffer = pn_connection_driver_read_buffer(&driver_);
+ return mutable_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::read_done(size_t n) {
+ return pn_connection_driver_read_done(&driver_, n);
+}
+
+void connection_driver::read_close() {
+ pn_connection_driver_read_close(&driver_);
+}
+
+const_buffer connection_driver::write_buffer() {
+ pn_bytes_t buffer = pn_connection_driver_write_buffer(&driver_);
+ return const_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::write_done(size_t n) {
+ return pn_connection_driver_write_done(&driver_, n);
+}
+
+void connection_driver::write_close() {
+ pn_connection_driver_write_close(&driver_);
+}
+
+void connection_driver::disconnected(const proton::error_condition& err) {
+ pn_condition_t* condition = pn_transport_condition(driver_.transport);
+ if (!pn_condition_is_set(condition)) {
+ set_error_condition(err, condition);
+ }
+ pn_connection_driver_close(&driver_);
+}
+
+proton::connection connection_driver::connection() const {
+ return make_wrapper(driver_.connection);
+}
+
+proton::transport connection_driver::transport() const {
+ return make_wrapper(driver_.transport);
+}
+
+proton::container* connection_driver::container() const {
+ return container_;
+}
+
+}}
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
deleted file mode 100644
index 5e6483f..0000000
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/io/connection_engine.hpp"
-
-#include "proton/event_loop.hpp"
-#include "proton/error.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "messaging_adapter.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_event.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-
-namespace proton {
-namespace io {
-
-void connection_engine::init() {
- if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
- this->~connection_engine(); // Dtor won't be called on throw from ctor.
- throw proton::error(std::string("connection_engine allocation failed"));
- }
-}
-
-connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
-
-connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
- init();
- connection_context& ctx = connection_context::get(connection());
- ctx.container = container_;
- ctx.event_loop.reset(loop);
-}
-
-connection_engine::~connection_engine() {
- pn_connection_engine_destroy(&engine_);
-}
-
-void connection_engine::configure(const connection_options& opts, bool server) {
- proton::connection c(connection());
- opts.apply_unbound(c);
- if (server) pn_transport_set_server(engine_.transport);
- pn_connection_engine_bind(&engine_);
- opts.apply_bound(c);
- handler_ = opts.handler();
- connection_context::get(connection()).collector = engine_.collector;
-}
-
-void connection_engine::connect(const connection_options& opts) {
- connection_options all;
- if (container_) {
- all.container_id(container_->id());
- all.update(container_->client_connection_options());
- }
- all.update(opts);
- configure(all, false);
- connection().open();
-}
-
-void connection_engine::accept(const connection_options& opts) {
- connection_options all;
- if (container_) {
- all.container_id(container_->id());
- all.update(container_->server_connection_options());
- }
- all.update(opts);
- configure(all, true);
-}
-
-bool connection_engine::dispatch() {
- pn_event_t* c_event;
- while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
- proton_event cpp_event(c_event, container_);
- try {
- if (handler_ != 0) {
- messaging_adapter adapter(*handler_);
- cpp_event.dispatch(adapter);
- }
- } catch (const std::exception& e) {
- pn_condition_t *cond = pn_transport_condition(engine_.transport);
- if (!pn_condition_is_set(cond)) {
- pn_condition_format(cond, "exception", "%s", e.what());
- }
- }
- pn_connection_engine_pop_event(&engine_);
- }
- return !pn_connection_engine_finished(&engine_);
-}
-
-mutable_buffer connection_engine::read_buffer() {
- pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
- return mutable_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::read_done(size_t n) {
- return pn_connection_engine_read_done(&engine_, n);
-}
-
-void connection_engine::read_close() {
- pn_connection_engine_read_close(&engine_);
-}
-
-const_buffer connection_engine::write_buffer() {
- pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
- return const_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::write_done(size_t n) {
- return pn_connection_engine_write_done(&engine_, n);
-}
-
-void connection_engine::write_close() {
- pn_connection_engine_write_close(&engine_);
-}
-
-void connection_engine::disconnected(const proton::error_condition& err) {
- pn_condition_t* condition = pn_transport_condition(engine_.transport);
- if (!pn_condition_is_set(condition)) {
- set_error_condition(err, condition);
- }
- pn_connection_engine_close(&engine_);
-}
-
-proton::connection connection_engine::connection() const {
- return make_wrapper(engine_.connection);
-}
-
-proton::transport connection_engine::transport() const {
- return make_wrapper(engine_.transport);
-}
-
-proton::container* connection_engine::container() const {
- return container_;
-}
-
-}}
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index b84722c..e5ec55a 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -74,7 +74,7 @@
// Create dummy flow event where "drain finish" can be detected.
pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object()));
connection_context& cctx = connection_context::get(pnc);
- // connection_engine collector is per connection. Reactor collector is global.
+ // connection_driver collector is per connection. Reactor collector is global.
pn_collector_t *coll = cctx.collector;
if (!coll)
coll = pn_reactor_collector(pn_object_reactor(pnc));
diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp
index f8dc3d8..5b5d487 100644
--- a/proton-c/bindings/cpp/src/thread_safe_test.cpp
+++ b/proton-c/bindings/cpp/src/thread_safe_test.cpp
@@ -24,7 +24,7 @@
#include "proton_bits.hpp"
#include "proton/thread_safe.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
#include <proton/connection.h>
@@ -37,7 +37,7 @@
pn_connection_t* c = 0;
thread_safe<connection>* p = 0;
{
- io::connection_engine e;
+ io::connection_driver e;
c = unwrap(e.connection());
int r = pn_refcount(c);
ASSERT(r >= 1); // engine may have internal refs (transport, collector).
@@ -54,7 +54,7 @@
{
std::shared_ptr<thread_safe<connection> > sp;
{
- io::connection_engine e;
+ io::connection_driver e;
c = unwrap(e.connection());
sp = make_shared_thread_safe(e.connection());
}
@@ -63,7 +63,7 @@
{
std::unique_ptr<thread_safe<connection> > up;
{
- io::connection_engine e;
+ io::connection_driver e;
c = unwrap(e.connection());
up = make_unique_thread_safe(e.connection());
}
@@ -78,7 +78,7 @@
connection c;
pn_connection_t* pc = 0;
{
- io::connection_engine eng;
+ io::connection_driver eng;
c = eng.connection();
pc = unwrap(c); // Unwrap in separate scope to avoid confusion from temp values.
}
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index ccd679d..9c6009f 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -5,35 +5,31 @@
The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
into proton [events](@ref event) and generates AMQP bytes from application
-calls.
+calls. There is no IO or threading code in this part of the library.
-The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
+## Proactive event-driven programming
+
+The [Proactor API](@ref proactor) is a pro-active, asynchronous framework to
+build single or multi-threaded Proton C applications. It manages the IO
+transport layer so you can write portable, event-driven AMQP code using the @ref
+engine API.
+
+## IO Integration
+
+The [connection driver](@ref connection_driver) provides a simple bytes in/bytes
out, event-driven interface so you can read AMQP data from any source, process
-the resulting [events](@ref event) and write AMQP output to any destination.
+the resulting [events](@ref event) and write AMQP output to any destination. It
+lets you use proton in in alternate event loops, or for specialized embedded
+applications.
-There is no IO or threading code in this part of the library, so it can be
-embedded in many different environments. The proton project provides language
-bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
-threading facilities of the bound language.
-
-## Integrating with IO
-
-The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to
-build single or multi-threaded Proton C applications.
-
-For advanced use-cases it is possible to write your own implementation of the
-proactor API for an unusual IO or threading framework. Any proton application
+It is also possible to write your own implementation of the @ref proactor if you
+are dealing with an unusual IO or threading framework. Any proton application
written to the proactor API will be able to use your implementation.
-## Messenger and Reactor APIs
+## Messenger and Reactor APIs (deprecated)
-The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
-to be simple APIs that included IO support directly out of the box.
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs are older APIs
+that were limited to single-threaded applications.
-They both had good points but were both based on the assumption of a single-threaded
-environment using a POSIX-like poll() call. This was a problem for performance on some
-platforms and did not support multi-threaded applications.
-
-Note however that application code which interacts with the AMQP @ref engine and
-processes AMQP @ref "events" event is the same for the proactor and reactor APIs,
-so is quite easy to convert. The main difference is in how connections are set up.
+Existing @ref reactor applications can be converted easily to use the @ref proactor,
+since they share the same @engine API and @ref event set.
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index 0ed23b0..70fad73 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -38,7 +38,7 @@
/**
* @file
*
- * Connection API for the proton Engine.
+ * Connection API for the proton @ref engine
*
* @defgroup connection Connection
* @ingroup engine
diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h
new file mode 100644
index 0000000..4fa3fb9
--- /dev/null
+++ b/proton-c/include/proton/connection_driver.h
@@ -0,0 +1,243 @@
+#ifndef PROTON_CONNECTION_DRIVER_H
+#define PROTON_CONNECTION_DRIVER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * @file
+ *
+ * @defgroup connection_driver Connection Driver
+ *
+ * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
+ * transports. Provides a single point of control for an AMQP connection and
+ * a simple bytes-in/bytes-out interface that lets you:
+ *
+ * - process AMQP-encoded bytes from some input byte stream
+ * - generate ::pn_event_t events for your application to handle
+ * - encode resulting AMQP output bytes for some output byte stream
+ *
+ * The pn_connection_driver_() functions provide a simplified API and extra
+ * logic to use ::pn_connection_t and ::pn_transport_t as a unit. You can also
+ * access them directly for features that are not exposed via the @ref
+ * connection_driver API.
+ *
+ * The engine buffers events and data, you should run it until
+ * pn_connection_driver_finished() is true, to ensure all reading, writing and
+ * event handling (including ERROR and FINAL events) is finished.
+ *
+ * ## Error handling
+ *
+ * The pn_connection_driver_*() functions do not return an error code. IO errors set
+ * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
+ * code can set errors using pn_connection_driver_errorf()
+ *
+ * ## IO patterns
+ *
+ * This API supports asynchronous, proactive, non-blocking and reactive IO. An
+ * integration does not have to follow the dispatch-read-write sequence above,
+ * but note that you should handle all available events before calling
+ * pn_connection_driver_read_buffer() and check that `size` is non-zero before
+ * starting a blocking or asynchronous read call. A `read` started while there
+ * are unprocessed CLOSE events in the buffer may never complete.
+ *
+ * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
+ * an AMQP connection can close separately
+ *
+ * ## Thread safety
+ *
+ * The @ref engine types are not thread safe, but each connection and its
+ * associated types forms an independent unit. Different connections can be
+ * processed concurrently by different threads.
+ *
+ * @{
+ */
+
+#include <proton/import_export.h>
+#include <proton/event.h>
+#include <proton/types.h>
+
+#include <stdarg.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Struct containing the 3 elements needed to driver AMQP IO and events, aggregated as a unit.
+ */
+typedef struct pn_connection_driver_t {
+ pn_connection_t *connection;
+ pn_transport_t *transport;
+ pn_event_batch_t batch;
+} pn_connection_driver_t;
+
+/**
+ * Set #connection and #transport to the provided values, or create a new
+ * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
+ * The provided values belong to the connection driver and will be freed by
+ * pn_connection_driver_destroy()
+ *
+ * The transport is bound automatically after the PN_CONNECTION_INIT has been is
+ * handled by the application. It can be bound earlier with
+ * pn_connection_driver_bind().
+ *
+ * The following functions must be called before the transport is
+ * bound to have effect: pn_connection_set_username(), pn_connection_set_password(),
+ * pn_transport_set_server()
+ *
+ * @return PN_OUT_OF_MEMORY if any allocation fails.
+ */
+PN_EXTERN int pn_connection_driver_init(pn_connection_driver_t*, pn_connection_t*, pn_transport_t*);
+
+/** Force binding of the transport.
+ * This happens automatically after the PN_CONNECTION_INIT is processed.
+ *
+ * @return PN_STATE_ERR if the transport is already bound.
+ */
+PN_EXTERN int pn_connection_driver_bind(pn_connection_driver_t *d);
+
+/**
+ * Unbind, release and free #connection and #transport. Set all pointers to
+ * NULL. Does not free the @ref pn_connection_driver_t struct itself.
+ */
+PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *);
+
+/**
+ * Get the read buffer.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_driver_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *);
+
+/**
+ * Process the first n bytes of data in pn_connection_driver_read_buffer() and
+ * reclaim the buffer space.
+ */
+PN_EXTERN void pn_connection_driver_read_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the read side. Call when the IO can no longer be read.
+ */
+PN_EXTERN void pn_connection_driver_read_close(pn_connection_driver_t *);
+
+/**
+ * True if read side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_read_closed(pn_connection_driver_t *);
+
+/**
+ * Get the write buffer.
+ *
+ * Write data from buf.start to your IO destination, up to a max of buf.size.
+ * Call pn_connection_driver_write_done() when writing is complete.
+ *
+ * buf.size==0 means there is nothing to write.
+ */
+ PN_EXTERN pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *);
+
+/**
+ * Call when the first n bytes of pn_connection_driver_write_buffer() have been
+ * written to IO. Reclaims the buffer space and reset the write buffer.
+ */
+PN_EXTERN void pn_connection_driver_write_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the write side. Call when IO can no longer be written to.
+ */
+PN_EXTERN void pn_connection_driver_write_close(pn_connection_driver_t *);
+
+/**
+ * True if write side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_write_closed(pn_connection_driver_t *);
+
+/**
+ * Close both sides side.
+ */
+PN_EXTERN void pn_connection_driver_close(pn_connection_driver_t * c);
+
+/**
+ * Get the next event to handle.
+ *
+ * @return pointer is valid till the next call of
+ * pn_connection_driver_next(). NULL if there are no more events available now,
+ * reading/writing may produce more.
+ */
+PN_EXTERN pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *);
+
+/**
+ * True if pn_connection_driver_next_event() will return a non-NULL event.
+ */
+PN_EXTERN bool pn_connection_driver_has_event(pn_connection_driver_t *);
+
+/**
+ * Return true if the the engine is closed for reading and writing and there are
+ * no more events.
+ *
+ * Call pn_connection_driver_free() to free all related memory.
+ */
+PN_EXTERN bool pn_connection_driver_finished(pn_connection_driver_t *);
+
+/**
+ * Set IO error information.
+ *
+ * The name and formatted description are set on the transport condition, and
+ * returned as a PN_TRANSPORT_ERROR event from pn_connection_driver_next_event().
+ *
+ * You must call this *before* pn_connection_driver_read_close() or
+ * pn_connection_driver_write_close() to ensure the error is processed.
+ */
+PN_EXTERN void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...);
+
+/**
+ * Set IO error information via a va_list, see pn_connection_driver_errorf()
+ */
+PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list);
+
+/**
+ * Log a string message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, char *fmt, ...);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap);
+
+/**
+ * If batch is part of a connection_driver, return the connection_driver address,
+ * else return NULL
+ */
+PN_EXTERN pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch);
+///@}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_CONNECTION_DRIVER_H
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
deleted file mode 100644
index b7022a9..0000000
--- a/proton-c/include/proton/connection_engine.h
+++ /dev/null
@@ -1,313 +0,0 @@
-#ifndef PROTON_CONNECTION_ENGINE_H
-#define PROTON_CONNECTION_ENGINE_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * @file
- *
- * **Experimental** The connection IO API is a set of functions to simplify
- * integrating proton with different IO and concurrency platforms. The portable
- * parts of a Proton application should use the @ref engine types. We will
- * use "application" to mean the portable part of the application and
- * "integration" to mean code that integrates with a particular IO platform.
- *
- * The connection_engine functions take a @ref pn_connection_t\*, and perform common
- * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and
- * @ref pn_collector_t so you can treat them as a unit. You can also work with
- * these types directly for features not available via @ref connection_engine API.
- *
- * @defgroup connection_engine Connection Engine
- *
- * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
- * transports. Provides a single point of control for an AMQP connection and
- * a simple bytes-in/bytes-out interface that lets you:
- *
- * - process AMQP-encoded bytes from some input byte stream
- * - generate @ref pn_event_t events for your application to handle
- * - encode resulting AMQP output bytes for some output byte stream
- *
- * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref
- * pn_collector_t and provides functions to operate on all three as a unit for
- * IO integration. You can also use them directly for anything not covered by
- * this API
- *
- * For example a simple blocking IO integration with the imaginary "my_io" library:
- *
- * pn_connection_engine_t ce;
- * pn_connection_engine_init(&ce);
- * while (!pn_connection_engine_finished(&ce) {
- * // Dispatch events to be handled by the application.
- * pn_event_t *e;
- * while ((e = pn_connection_engine_event(&ce))!= NULL) {
- * my_app_handle(e); // Pass to the application handler
- * switch (pn_event_type(e)) {
- * case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce);
- * // Only for full-duplex IO where read/write can shutdown separately.
- * case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break;
- * case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break;
- * default: break;
- * };
- * e = pn_connection_engine_pop_event(&ce);
- * }
- * // Read from my_io into the connection buffer
- * pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce);
- * if (readbuf.size) {
- * size_t n = my_io_read(readbuf.start, readbuf.size, ...);
- * if (n > 0) {
- * pn_connection_engine_read_done(&ce, n);
- * } else if (n < 0) {
- * pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...);
- * pn_connection_engine_read_close(&ce);
- * }
- * }
- * // Write from connection buffer to my_io
- * pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce);
- * if (writebuf.size) {
- * size_t n = my_io_write_data(writebuf.start, writebuf.size, ...);
- * if (n < 0) {
- * pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...);
- * pn_connection_engine_write_close(&ce);
- * } else {
- * pn_connection_engine_write_done(&ce, n);
- * }
- * }
- * }
- * // If my_io doesn't have separate read/write shutdown, then we should close it now.
- * my_io_close(...);
- *
- * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
- * an AMQP connection can close separately, the example shows how to handle this
- * for full-duplex IO or IO with a simple close.
- *
- * The engine buffers events, you must keep processing till
- * pn_connection_engine_finished() is true, to ensure all reading, writing and event
- * handling (including ERROR and FINAL events) is completely finished.
- *
- * ## Error handling
- *
- * The pn_connection_engine_*() functions do not return an error code. IO errors set
- * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
- * code can set errors using pn_connection_engine_errorf()
- *
- * ## Other IO patterns
- *
- * This API supports asynchronous, proactive, non-blocking and reactive IO. An
- * integration does not have to follow the dispatch-read-write sequence above,
- * but note that you should handle all available events before calling
- * pn_connection_engine_read_buffer() and check that `size` is non-zero before
- * starting a blocking or asynchronous read call. A `read` started while there
- * are unprocessed CLOSE events in the buffer may never complete.
- *
- * ## Thread safety
- *
- * The @ref engine types are not thread safe, but each connection and its
- * associated types forms an independent unit. Different connections can be
- * processed concurrently by different threads.
- *
- * @defgroup connection_engine Connection IO
- * @{
- */
-
-#include <proton/import_export.h>
-#include <proton/event.h>
-#include <proton/types.h>
-
-#include <stdarg.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * Struct containing a connection, transport and collector. See
- * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine()
- */
-typedef struct pn_connection_engine_t {
- pn_connection_t *connection;
- pn_transport_t *transport;
- pn_collector_t *collector;
-} pn_connection_engine_t;
-
-/**
- * Set #connection and #transport to the provided values, or create a new
- * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
- * The provided values belong to the connection engine and will be freed by
- * pn_connection_engine_destroy()
- *
- * Create a new @ref pn_collector_t and set as #collector.
- *
- * The transport and connection are *not* bound at this point. You should
- * configure them as needed and let the application handle the
- * PN_CONNECTION_INIT from pn_connection_engine_event() before calling
- * pn_connection_engine_bind().
- *
- * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY
- */
-PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*);
-
-/**
- * Bind the connection to the transport when the external IO is ready.
- *
- * The following functions (if called at all) must be called *before* bind:
- * pn_connection_set_username(), pn_connection_set_password(), pn_transport_set_server()
- *
- * If there is an external IO error during setup, set a transport error, close
- * the transport and then bind. The error events are reported to the application
- * via pn_connection_engine_event().
- *
- * @return an error code if the bind fails.
- */
-PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *);
-
-/**
- * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL.
- * Does not free the @ref pn_connection_engine_t struct itself.
- */
-PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *);
-
-/**
- * Get the read buffer.
- *
- * Copy data from your input byte source to buf.start, up to buf.size.
- * Call pn_connection_engine_read_done() when reading is complete.
- *
- * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
- */
-PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *);
-
-/**
- * Process the first n bytes of data in pn_connection_engine_read_buffer() and
- * reclaim the buffer space.
- */
-PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the read side. Call when the IO can no longer be read.
- */
-PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *);
-
-/**
- * True if read side is closed.
- */
-PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *);
-
-/**
- * Get the write buffer.
- *
- * Write data from buf.start to your IO destination, up to a max of buf.size.
- * Call pn_connection_engine_write_done() when writing is complete.
- *
- * buf.size==0 means there is nothing to write.
- */
- PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *);
-
-/**
- * Call when the first n bytes of pn_connection_engine_write_buffer() have been
- * written to IO. Reclaims the buffer space and reset the write buffer.
- */
-PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the write side. Call when IO can no longer be written to.
- */
-PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *);
-
-/**
- * True if write side is closed.
- */
-PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *);
-
-/**
- * Close both sides side.
- */
-PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c);
-
-/**
- * Get the current event. Call pn_connection_engine_done() when done handling it.
- * Note that if PN_TRACE_EVT is enabled this will log the event, so you should
- * avoid calling it more than once per event. Use pn_connection_engine_has_event()
- * to silently test if any events are available.
- *
- * @return NULL if there are no more events ready. Reading/writing data may produce more.
- */
-PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *);
-
-/**
- * True if pn_connection_engine_event() will return a non-NULL event.
- */
-PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *);
-
-/**
- * Drop the current event, advance pn_connection_engine_event() to the next event.
- */
-PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *);
-
-/**
- * Return true if the the engine is closed for reading and writing and there are
- * no more events.
- *
- * Call pn_connection_engine_free() to free all related memory.
- */
-PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *);
-
-/**
- * Set IO error information.
- *
- * The name and formatted description are set on the transport condition, and
- * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event().
- *
- * You must call this *before* pn_connection_engine_read_close() or
- * pn_connection_engine_write_close() to ensure the error is processed.
- *
- * If there is already a transport condition set, this call does nothing. For
- * more complex cases, you can work with the transport condition directly using:
- *
- * pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn));
- */
-PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...);
-
-/**
- * Set IO error information via a va_list, see pn_connection_engine_errorf()
- */
-PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list);
-
-/**
- * Log a string message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap);
-
-///@}
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif // PROTON_CONNECTION_ENGINE_H
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index ffcf830..931437e 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -39,6 +39,9 @@
%ignore pn_bytes_t;
%ignore pn_rwbytes_t;
+/* pn_event_batch_t is not used directly by bindings */
+%ignore pn_event_batch_t;
+
/* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */
%ignore pn_class_t;
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4dca2d5..31d4bdd 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -428,6 +428,32 @@
PN_EXTERN bool pn_collector_pop(pn_collector_t *collector);
/**
+ * Return the next event to be handled.
+ *
+ * Returns the head event if it has not previously been returned by
+ * pn_collector_next(), otherwise does pn_collector_pop() and returns
+ * the new head event.
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return the next event.
+ */
+PN_EXTERN pn_event_t *pn_collector_next(pn_collector_t *collector);
+
+/**
+ * Return the same event as the previous call to pn_collector_next()
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return a pointer to the event returned by previous call to pn_collector_next()
+ */
+PN_EXTERN pn_event_t *pn_collector_prev(pn_collector_t *collector);
+
+/**
* Check if there are more events after the current event. If this
* returns true, then pn_collector_peek() will return an event even
* after pn_collector_pop() is called.
@@ -506,6 +532,36 @@
*/
PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
+/**
+ * **Experimental**: A batch of events to handle. Call pn_event_batch_next() in
+ * a loop until it returns NULL to handle them.
+ */
+typedef struct pn_event_batch_t pn_event_batch_t;
+
+/* NOTE: there is deliberately no peek(), more() or other look-ahead on an event
+ * batch. We want to know exactly which events have been handled, next() only
+ * allows the user to get each event exactly once, in order.
+ */
+
+/**
+ * **Experimental**: Remove the next event from the batch and return it. NULL
+ * means the batch is empty. The returned event pointer is valid until
+ * pn_event_batch_next() is called again on the same batch.
+ */
+PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
+
+/**
+ *@cond INTERNAL
+ * pn_event_batch_next() can be re-implemented for different behaviors in different contextxs.
+ */
+struct pn_event_batch_t {
+ pn_event_t *(*next_event)(pn_event_batch_t *batch);
+};
+
+/**
+ *@endcond
+ */
+
#ifdef __cplusplus
}
#endif
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 49d7b6a..e23a24f 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -33,30 +33,27 @@
/**
* @file
*
- * **Experimental**: Proactor API for the the proton @ref engine.
+ * **Experimental**: Proactor API for the the proton @ref engine
*
* @defgroup proactor Proactor
*
* **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications.
*
- * The proactor establishes and listens for connections. It creates the @ref
- * "transport" transport that sends and receives data over the network and
- * delivers @ref "events" event to application threads for processing.
+ * The proactor establishes and listens for connections. It creates
+ * the @ref transport that sends and receives data over the network and
+ * delivers @ref event to application threads for handling.
*
- * ## Multi-threading
- *
- * The @ref proactor is thread-safe, but the @ref "protocol engine" is not. The
- * proactor ensures that each @ref "connection" connection and its associated
- * values (@ref session, @ref link etc.) is processed sequentially, even if there
- * are multiple application threads. See pn_proactor_wait()
+ * **Multi-threading**:
+ * The @ref proactor is thread-safe, but the @ref engine is not. The proactor
+ * ensures that each @ref connection and its associated values (@ref session,
+ * @ref link etc.) is handle sequentially, even if there are multiple
+ * application threads. See pn_proactor_wait()
*
* @{
*/
/**
- * The proactor creates and manage @ref "transports" transport and delivers @ref
- * "event" events to the application.
- *
+ * The proactor.
*/
typedef struct pn_proactor_t pn_proactor_t;
@@ -70,13 +67,6 @@
*/
void pn_proactor_free(pn_proactor_t*);
-/* FIXME aconway 2016-11-12: connect and listen need options to enable
- things like websockets, alternate encryption or other features.
- The "extra" parameter will be replaced by an "options" parameter
- that will include providing extra data and other manipulators
- to affect how the connection is processed.
-*/
-
/**
* Asynchronous connect: a connection and transport will be created, the
* relevant events will be returned by pn_proactor_wait()
@@ -104,13 +94,27 @@
pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
/**
- * Wait for an event. Can be called in multiple threads concurrently.
- * You must call pn_event_done() when the event has been handled.
+ * Wait for events to handle. Call pn_proactor_done() after handling events.
*
- * The proactor ensures that events that cannot be handled concurrently
- * (e.g. events for for the same connection) are never returned concurrently.
+ * Thread safe: pn_proactor_wait() can be called concurrently, but the events in
+ * the returned ::pn_event_batch_t must be handled sequentially.
+ *
+ * The proactor always returns events that must be handled sequentially in the
+ * same batch or sequentially in a later batch after pn_proactor_done(). Any
+ * events returned concurrently by pn_proactor_wait() are safe to handle
+ * concurrently.
*/
-pn_event_t *pn_proactor_wait(pn_proactor_t* d);
+pn_event_batch_t *pn_proactor_wait(pn_proactor_t* d);
+
+/**
+ * Call when done handling events.
+ *
+ * It is generally most efficient to handle the entire batch in the thread
+ * that calls pn_proactor_wait(), then call pn_proactor_done(). If you call
+ * pn_proactor_done() earlier, the remaining events will be returned again by
+ * pn_proactor_wait(), possibly to another thread.
+ */
+void pn_proactor_done(pn_proactor_t* d, pn_event_batch_t *events);
/**
* Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait()
@@ -146,14 +150,6 @@
pn_proactor_t *pn_connection_proactor(pn_connection_t *c);
/**
- * Call when a proactor event has been handled. Does nothing if not a proactor event.
- *
- * Thread safe: May be called from any thread but must be called exactly once
- * for each event returned by pn_proactor_wait()
- */
-void pn_event_done(pn_event_t *);
-
-/**
* Get the proactor that created the event or NULL.
*/
pn_proactor_t *pn_event_proactor(pn_event_t *);
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index f31ddb0..3393e64 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -20,144 +20,149 @@
#include "engine-internal.h"
#include <proton/condition.h>
#include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/transport.h>
#include <string.h>
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
- ce->connection = c ? c : pn_connection();
- ce->transport = t ? t : pn_transport();
- ce->collector = pn_collector();
- if (!ce->connection || !ce->transport || !ce->collector) {
- pn_connection_engine_destroy(ce);
+struct driver_batch {
+ pn_event_batch_t batch;
+};
+
+static pn_event_t *batch_next(pn_event_batch_t *batch) {
+ pn_connection_driver_t *d =
+ (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch));
+ pn_collector_t *collector = pn_connection_collector(d->connection);
+ pn_event_t *handled = pn_collector_prev(collector);
+ if (handled && pn_event_type(handled) == PN_CONNECTION_INIT) {
+ pn_transport_bind(d->transport, d->connection); /* Init event handled, auto-bind */
+ }
+ pn_event_t *next = pn_collector_next(collector);
+ if (next && d->transport->trace & PN_TRACE_EVT) {
+ pn_string_clear(d->transport->scratch);
+ pn_inspect(next, d->transport->scratch);
+ pn_transport_log(d->transport, pn_string_get(d->transport->scratch));
+ }
+ return next;
+}
+
+int pn_connection_driver_init(pn_connection_driver_t* d, pn_connection_t *c, pn_transport_t *t) {
+ memset(d, 0, sizeof(*d));
+ d->batch.next_event = &batch_next;
+ d->connection = c ? c : pn_connection();
+ d->transport = t ? t : pn_transport();
+ pn_collector_t *collector = pn_collector();
+ if (!d->connection || !d->transport || !collector) {
+ if (collector) pn_collector_free(collector);
+ pn_connection_driver_destroy(d);
return PN_OUT_OF_MEMORY;
}
- pn_connection_collect(ce->connection, ce->collector);
+ pn_connection_collect(d->connection, collector);
return 0;
}
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
- return pn_transport_bind(ce->transport, ce->connection);
+int pn_connection_driver_bind(pn_connection_driver_t *d) {
+ return pn_transport_bind(d->transport, d->connection);
}
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
- if (ce->transport) {
- pn_transport_unbind(ce->transport);
- pn_transport_free(ce->transport);
+void pn_connection_driver_destroy(pn_connection_driver_t *d) {
+ if (d->transport) {
+ pn_transport_unbind(d->transport);
+ pn_transport_free(d->transport);
}
- if (ce->collector) pn_collector_free(ce->collector);
- if (ce->connection) pn_connection_free(ce->connection);
- memset(ce, 0, sizeof(*ce));
+ if (d->connection) {
+ pn_collector_t *collector = pn_connection_collector(d->connection);
+ pn_connection_free(d->connection);
+ pn_collector_free(collector);
+ }
+ memset(d, 0, sizeof(*d));
}
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
- ssize_t cap = pn_transport_capacity(ce->transport);
- return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
+pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) {
+ ssize_t cap = pn_transport_capacity(d->transport);
+ return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
}
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
- if (n > 0) pn_transport_process(ce->transport, n);
+void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) {
+ if (n > 0) pn_transport_process(d->transport, n);
}
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
- return pn_transport_capacity(ce->transport) < 0;
+bool pn_connection_driver_read_closed(pn_connection_driver_t *d) {
+ return pn_transport_capacity(d->transport) < 0;
}
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_read_closed(ce)) {
- pn_transport_close_tail(ce->transport);
+void pn_connection_driver_read_close(pn_connection_driver_t *d) {
+ if (!pn_connection_driver_read_closed(d)) {
+ pn_transport_close_tail(d->transport);
}
}
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
- ssize_t pending = pn_transport_pending(ce->transport);
+pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *d) {
+ ssize_t pending = pn_transport_pending(d->transport);
return (pending > 0) ?
- pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
+ pn_bytes(pending, pn_transport_head(d->transport)) : pn_bytes_null;
}
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) {
if (n > 0)
- pn_transport_pop(ce->transport, n);
+ pn_transport_pop(d->transport, n);
}
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
- return pn_transport_pending(ce->transport) < 0;
+bool pn_connection_driver_write_closed(pn_connection_driver_t *d) {
+ return pn_transport_pending(d->transport) < 0;
}
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_write_closed(ce)) {
- pn_transport_close_head(ce->transport);
+void pn_connection_driver_write_close(pn_connection_driver_t *d) {
+ if (!pn_connection_driver_write_closed(d)) {
+ pn_transport_close_head(d->transport);
}
}
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
- pn_connection_engine_read_close(ce);
- pn_connection_engine_write_close(ce);
+void pn_connection_driver_close(pn_connection_driver_t *d) {
+ pn_connection_driver_read_close(d);
+ pn_connection_driver_write_close(d);
}
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
- pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
- if (e) {
- pn_transport_t *t = ce->transport;
- if (t && t->trace & PN_TRACE_EVT) {
- /* This can log the same event twice if pn_connection_engine_event is called
- * twice but for debugging it is much better to log before handling than after.
- */
- pn_string_clear(t->scratch);
- pn_inspect(e, t->scratch);
- pn_transport_log(t, pn_string_get(t->scratch));
- }
- }
- return e;
+pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
+ return pn_event_batch_next(&d->batch);
}
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
- return ce->collector && pn_collector_peek(ce->collector);
+bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
+ return pn_collector_peek(pn_connection_collector(d->connection));
}
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
- if (ce->collector) {
- pn_event_t *e = pn_collector_peek(ce->collector);
- if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
- /* Events can accumulate behind the TRANSPORT_CLOSED before the
- * PN_TRANSPORT_CLOSED event is handled. They can never be processed
- * so release them.
- */
- pn_collector_release(ce->collector);
- } else {
- pn_collector_pop(ce->collector);
- }
-
- }
+bool pn_connection_driver_finished(pn_connection_driver_t *d) {
+ return pn_transport_closed(d->transport) && !pn_connection_driver_has_event(d);
}
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
- return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
-}
-
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
- pn_transport_t *t = ce->transport;
+void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list ap) {
+ pn_transport_t *t = d->transport;
pn_condition_t *cond = pn_transport_condition(t);
pn_string_vformat(t->scratch, fmt, ap);
pn_condition_set_name(cond, name);
pn_condition_set_description(cond, pn_string_get(t->scratch));
}
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
- pn_connection_engine_verrorf(ce, name, fmt, ap);
+ pn_connection_driver_verrorf(d, name, fmt, ap);
va_end(ap);
}
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
+void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg) {
+ pn_transport_log(d->transport, msg);
}
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
- pn_transport_vlogf(ce->transport, fmt, ap);
+void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) {
+ pn_transport_vlogf(d->transport, fmt, ap);
}
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
+void pn_connection_driver_vlog(pn_connection_driver_t *d, const char *msg) {
+ pn_transport_log(d->transport, msg);
+}
+
+pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch) {
+ return (batch->next_event == batch_next) ?
+ (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)) :
+ NULL;
}
diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c
deleted file mode 100644
index f31ddb0..0000000
--- a/proton-c/src/core/connection_engine.c
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "engine-internal.h"
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/connection_engine.h>
-#include <proton/transport.h>
-#include <string.h>
-
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
- ce->connection = c ? c : pn_connection();
- ce->transport = t ? t : pn_transport();
- ce->collector = pn_collector();
- if (!ce->connection || !ce->transport || !ce->collector) {
- pn_connection_engine_destroy(ce);
- return PN_OUT_OF_MEMORY;
- }
- pn_connection_collect(ce->connection, ce->collector);
- return 0;
-}
-
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
- return pn_transport_bind(ce->transport, ce->connection);
-}
-
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
- if (ce->transport) {
- pn_transport_unbind(ce->transport);
- pn_transport_free(ce->transport);
- }
- if (ce->collector) pn_collector_free(ce->collector);
- if (ce->connection) pn_connection_free(ce->connection);
- memset(ce, 0, sizeof(*ce));
-}
-
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
- ssize_t cap = pn_transport_capacity(ce->transport);
- return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
-}
-
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
- if (n > 0) pn_transport_process(ce->transport, n);
-}
-
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
- return pn_transport_capacity(ce->transport) < 0;
-}
-
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_read_closed(ce)) {
- pn_transport_close_tail(ce->transport);
- }
-}
-
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
- ssize_t pending = pn_transport_pending(ce->transport);
- return (pending > 0) ?
- pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
-}
-
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
- if (n > 0)
- pn_transport_pop(ce->transport, n);
-}
-
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
- return pn_transport_pending(ce->transport) < 0;
-}
-
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_write_closed(ce)) {
- pn_transport_close_head(ce->transport);
- }
-}
-
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
- pn_connection_engine_read_close(ce);
- pn_connection_engine_write_close(ce);
-}
-
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
- pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
- if (e) {
- pn_transport_t *t = ce->transport;
- if (t && t->trace & PN_TRACE_EVT) {
- /* This can log the same event twice if pn_connection_engine_event is called
- * twice but for debugging it is much better to log before handling than after.
- */
- pn_string_clear(t->scratch);
- pn_inspect(e, t->scratch);
- pn_transport_log(t, pn_string_get(t->scratch));
- }
- }
- return e;
-}
-
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
- return ce->collector && pn_collector_peek(ce->collector);
-}
-
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
- if (ce->collector) {
- pn_event_t *e = pn_collector_peek(ce->collector);
- if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
- /* Events can accumulate behind the TRANSPORT_CLOSED before the
- * PN_TRANSPORT_CLOSED event is handled. They can never be processed
- * so release them.
- */
- pn_collector_release(ce->collector);
- } else {
- pn_collector_pop(ce->collector);
- }
-
- }
-}
-
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
- return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
-}
-
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
- pn_transport_t *t = ce->transport;
- pn_condition_t *cond = pn_transport_condition(t);
- pn_string_vformat(t->scratch, fmt, ap);
- pn_condition_set_name(cond, name);
- pn_condition_set_description(cond, pn_string_get(t->scratch));
-}
-
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
- va_list ap;
- va_start(ap, fmt);
- pn_connection_engine_verrorf(ce, name, fmt, ap);
- va_end(ap);
-}
-
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
-}
-
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
- pn_transport_vlogf(ce->transport, fmt, ap);
-}
-
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
-}
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 7882327..2a0a5cf 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -28,7 +28,8 @@
pn_list_t *pool;
pn_event_t *head;
pn_event_t *tail;
- bool freed;
+ bool freed:1;
+ bool head_returned:1; /* Head has been returned by pn_collector_next() */
};
struct pn_event_t {
@@ -51,11 +52,8 @@
static void pn_collector_drain(pn_collector_t *collector)
{
assert(collector);
-
- while (pn_collector_peek(collector)) {
- pn_collector_pop(collector);
- }
-
+ while (pn_collector_next(collector))
+ ;
assert(!collector->head);
assert(!collector->tail);
}
@@ -175,6 +173,7 @@
bool pn_collector_pop(pn_collector_t *collector)
{
+ collector->head_returned = false;
pn_event_t *event = collector->head;
if (event) {
collector->head = event->next;
@@ -190,6 +189,19 @@
return true;
}
+pn_event_t *pn_collector_next(pn_collector_t *collector)
+{
+ if (collector->head_returned) {
+ pn_collector_pop(collector);
+ }
+ collector->head_returned = collector->head;
+ return collector->head;
+}
+
+pn_event_t *pn_collector_prev(pn_collector_t *collector) {
+ return collector->head_returned ? collector->head : NULL;
+}
+
bool pn_collector_more(pn_collector_t *collector)
{
assert(collector);
@@ -386,3 +398,7 @@
}
return NULL;
}
+
+pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) {
+ return batch->next_event(batch);
+}
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index a36d01c..267c861 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -313,7 +313,8 @@
}
static void drain(pn_collector_t *collector) {
- while (pn_collector_peek(collector)) { pn_collector_pop(collector); }
+ while (pn_collector_next(collector))
+ ;
}
static void test_collector_connection_transport(void) {
diff --git a/qpid-proton-cpp.syms b/qpid-proton-cpp.syms
index 7a25651..c24e898 100644
--- a/qpid-proton-cpp.syms
+++ b/qpid-proton-cpp.syms
@@ -30,24 +30,24 @@
proton::connection::user(std::string const&)
proton::connection::~connection()
-proton::connection_engine::can_read() const
-proton::connection_engine::can_write() const
-proton::connection_engine::closed() const
-proton::connection_engine::connection() const
-proton::connection_engine::connection_engine(proton::handler&, proton::connection_options const&)
-proton::connection_engine::container::container(std::string const&)
-proton::connection_engine::container::id() const
-proton::connection_engine::container::make_options()
-proton::connection_engine::container::options(proton::connection_options const&)
-proton::connection_engine::container::~container()
-proton::connection_engine::dispatch()
-proton::connection_engine::io_error::io_error(std::string const&)
-proton::connection_engine::io_error::~io_error()
-proton::connection_engine::no_opts
-proton::connection_engine::process(int)
-proton::connection_engine::try_read()
-proton::connection_engine::try_write()
-proton::connection_engine::~connection_engine()
+proton::connection_driver::can_read() const
+proton::connection_driver::can_write() const
+proton::connection_driver::closed() const
+proton::connection_driver::connection() const
+proton::connection_driver::connection_driver(proton::handler&, proton::connection_options const&)
+proton::connection_driver::container::container(std::string const&)
+proton::connection_driver::container::id() const
+proton::connection_driver::container::make_options()
+proton::connection_driver::container::options(proton::connection_options const&)
+proton::connection_driver::container::~container()
+proton::connection_driver::dispatch()
+proton::connection_driver::io_error::io_error(std::string const&)
+proton::connection_driver::io_error::~io_error()
+proton::connection_driver::no_opts
+proton::connection_driver::process(int)
+proton::connection_driver::try_read()
+proton::connection_driver::try_write()
+proton::connection_driver::~connection_driver()
proton::connection_options::connection_options()
proton::connection_options::connection_options(proton::connection_options const&)
@@ -587,8 +587,8 @@
# Only types with the following info can be thrown across shared abject boundary
# Or correctly dynamically cast by user
typeinfo for proton::connection
-typeinfo for proton::connection_engine
-typeinfo for proton::connection_engine::io_error
+typeinfo for proton::connection_driver
+typeinfo for proton::connection_driver::io_error
typeinfo for proton::conversion_error
typeinfo for proton::endpoint
typeinfo for proton::error
@@ -600,8 +600,8 @@
typeinfo for proton::timeout_error
typeinfo for proton::url_error
typeinfo name for proton::connection
-typeinfo name for proton::connection_engine
-typeinfo name for proton::connection_engine::io_error
+typeinfo name for proton::connection_driver
+typeinfo name for proton::connection_driver::io_error
typeinfo name for proton::conversion_error
typeinfo name for proton::endpoint
typeinfo name for proton::error
@@ -613,8 +613,8 @@
typeinfo name for proton::timeout_error
typeinfo name for proton::url_error
vtable for proton::connection
-vtable for proton::connection_engine
-vtable for proton::connection_engine::io_error
+vtable for proton::connection_driver
+vtable for proton::connection_driver::io_error
vtable for proton::conversion_error
vtable for proton::endpoint
vtable for proton::error