PROTON-1344: proactor batch events, rename connection_driver

renamed pn_connection_engine as pn_connection_driver.

pn_proactor_wait() returns pn_event_batch_t* rather than individual pn_event_t*
to reduce thread-context switching.

Added pn_collector_next() for simpler event looping.
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index e11a8bd..66381fc 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/proactor.h>
 #include <proton/engine.h>
 #include <proton/sasl.h>
@@ -220,9 +220,11 @@
   const char *container_id;     /* AMQP container-id */
   size_t threads;
   pn_millis_t heartbeat;
+  bool finished;
 } broker_t;
 
 void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
+  memset(b, 0, sizeof(*b));
   b->proactor = pn_proactor();
   b->listener = NULL;
   queues_init(&b->queues);
@@ -293,8 +295,7 @@
 
 const int WINDOW=10;            /* Incoming credit window */
 
-static bool handle(broker_t* b, pn_event_t* e) {
-  bool more = true;
+static void handle(broker_t* b, pn_event_t* e) {
   pn_connection_t *c = pn_event_connection(e);
 
   switch (pn_event_type(e)) {
@@ -398,20 +399,24 @@
     break;
 
    case PN_PROACTOR_INTERRUPT:
-    more = false;
+    b->finished = true;
     break;
 
    default:
     break;
   }
-  pn_event_done(e);
-  return more;
 }
 
 static void broker_thread(void *void_broker) {
   broker_t *b = (broker_t*)void_broker;
-  while (handle(b, pn_proactor_wait(b->proactor)))
-    ;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(b, e);
+    }
+    pn_proactor_done(b->proactor, events);
+  } while(!b->finished);
 }
 
 static void usage(const char *arg0) {
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index 8dd2706..35afd5c 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -22,7 +22,7 @@
 #include <uv.h>
 
 #include <proton/condition.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/engine.h>
 #include <proton/extra.h>
 #include <proton/message.h>
@@ -44,11 +44,15 @@
   To provide concurrency the proactor uses a "leader-worker-follower" model,
   threads take turns at the roles:
 
-  - a single "leader" calls libuv functions and runs the uv_loop incrementally.
-  When there is work it hands over leadership and becomes a "worker"
+  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
+    to generate work. When there is work available it gives up leadership and
+    becomes a "worker"
+
   - "workers" handle events concurrently for distinct connections/listeners
-  When the work is done they become "followers"
-  - "followers" wait for the leader to step aside, one takes over as new leader.
+    They do as much work as they can get, when none is left they become "followers"
+
+  - "followers" wait for the leader to generate work and become workers.
+    When the leader itself becomes a worker, one of the followers takes over.
 
   This model is symmetric: any thread can take on any role based on run-time
   requirements. It also allows the IO and non-IO work associated with an IO
@@ -77,7 +81,7 @@
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
-/* common to connection engine and listeners */
+/* common to connection and listener */
 typedef struct psocket_t {
   /* Immutable */
   pn_proactor_t *proactor;
@@ -118,11 +122,11 @@
   return str[0] == '\001' ? NULL : str;
 }
 
-typedef struct pconn {
+typedef struct pconnection_t {
   psocket_t psocket;
 
   /* Only used by owner thread */
-  pn_connection_engine_t ceng;
+  pn_connection_driver_t driver;
 
   /* Only used by leader */
   uv_connect_t connect;
@@ -132,7 +136,7 @@
   size_t writing;
   bool reading:1;
   bool server:1;                /* accept, not connect */
-} pconn;
+} pconnection_t;
 
 struct pn_listener_t {
   psocket_t psocket;
@@ -140,6 +144,7 @@
   /* Only used by owner thread */
   pn_condition_t *condition;
   pn_collector_t *collector;
+  pn_event_batch_t batch;
   size_t backlog;
 };
 
@@ -153,6 +158,10 @@
   uv_loop_t loop;
   uv_async_t async;
 
+  /* Owner thread: proactor collector and batch can belong to leader or a worker */
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+
   /* Protected by lock */
   uv_mutex_t lock;
   queue start_q;
@@ -162,11 +171,7 @@
   size_t count;                 /* psocket count */
   bool inactive:1;
   bool has_leader:1;
-
-  /* Immutable collectors to hold fixed events */
-  pn_collector_t *interrupt_event;
-  pn_collector_t *timeout_event;
-  pn_collector_t *inactive_event;
+  bool batch_working:1;          /* batch belongs to a worker.  */
 };
 
 static bool push_lh(queue *q, psocket_t *ps) {
@@ -191,8 +196,8 @@
   return ps;
 }
 
-static inline pconn *as_pconn(psocket_t* ps) {
-  return ps->is_conn ? (pconn*)ps : NULL;
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+  return ps->is_conn ? (pconnection_t*)ps : NULL;
 }
 
 static inline pn_listener_t *as_listener(psocket_t* ps) {
@@ -213,9 +218,9 @@
 
 /* Detach from IO and put ps on the worker queue */
 static void leader_to_worker(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
   /* Don't detach if there are no events yet. */
-  if (pc && pn_connection_engine_has_event(&pc->ceng)) {
+  if (pc && pn_connection_driver_has_event(&pc->driver)) {
     if (pc->writing) {
       pc->writing  = 0;
       uv_cancel((uv_req_t*)&pc->write);
@@ -236,6 +241,28 @@
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
+/* Set a deferred action for leader, if not already set. */
+static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (!ps->action) {
+    ps->action = action;
+  }
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Owner thread send to worker thread. Set deferred action if not already set. */
+static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (!ps->action) {
+    ps->action = action;
+  }
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+
 /* Re-queue for further work */
 static void worker_requeue(psocket_t* ps) {
   uv_mutex_lock(&ps->proactor->lock);
@@ -244,25 +271,43 @@
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
-static pconn *new_pconn(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
-  pconn *pc = (pconn*)calloc(1, sizeof(*pc));
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+  pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
   if (!pc) return NULL;
-  if (pn_connection_engine_init(&pc->ceng, pn_connection_with_extra(extra.size), NULL) != 0) {
+  if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
     return NULL;
   }
   if (extra.start && extra.size) {
-    memcpy(pn_connection_get_extra(pc->ceng.connection).start, extra.start, extra.size);
+    memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
   }
   psocket_init(&pc->psocket, p,  true, host, port);
   if (server) {
-    pn_transport_set_server(pc->ceng.transport);
+    pn_transport_set_server(pc->driver.transport);
   }
-  pn_record_t *r = pn_connection_attachments(pc->ceng.connection);
+  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
   pn_record_def(r, PN_PROACTOR, PN_VOID);
   pn_record_set(r, PN_PROACTOR, pc);
   return pc;
 }
 
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+  return (batch->next_event == proactor_batch_next) ?
+    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+  return (batch->next_event == listener_batch_next) ?
+    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+  pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
+  return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
+}
+
 pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
   if (!l) {
@@ -278,6 +323,7 @@
   }
   psocket_init(&l->psocket, p, false, host, port);
   l->condition = pn_condition();
+  l->batch.next_event = listener_batch_next;
   l->backlog = backlog;
   return l;
 }
@@ -290,11 +336,12 @@
 }
 
 /* Free if there are no uv callbacks pending and no events */
-static void leader_pconn_maybe_free(pconn *pc) {
-    if (pn_connection_engine_has_event(&pc->ceng)) {
+static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
+    if (pn_connection_driver_has_event(&pc->driver)) {
       leader_to_worker(&pc->psocket);         /* Return to worker */
-    } else if (!(pc->psocket.tcp.data || pc->shutdown.data || pc->timer.data)) {
-      pn_connection_engine_destroy(&pc->ceng);
+    } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
+      /* All UV requests are finished */
+      pn_connection_driver_destroy(&pc->driver);
       leader_count(pc->psocket.proactor, -1);
       free(pc);
     }
@@ -314,7 +361,7 @@
 /* Free if there are no uv callbacks pending and no events */
 static void leader_maybe_free(psocket_t *ps) {
   if (ps->is_conn) {
-    leader_pconn_maybe_free(as_pconn(ps));
+    leader_pconnection_t_maybe_free(as_pconnection_t(ps));
   } else {
     leader_listener_maybe_free(as_listener(ps));
   }
@@ -336,9 +383,9 @@
   if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
     uv_close((uv_handle_t*)&ps->tcp, on_close);
   }
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
   if (pc) {
-    pn_connection_engine_close(&pc->ceng);
+    pn_connection_driver_close(&pc->driver);
     if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
       uv_timer_stop(&pc->timer);
       uv_close((uv_handle_t*)&pc->timer, on_close);
@@ -347,20 +394,20 @@
   leader_maybe_free(ps);
 }
 
-static pconn *get_pconn(pn_connection_t* c) {
+static pconnection_t *get_pconnection_t(pn_connection_t* c) {
   if (!c) return NULL;
   pn_record_t *r = pn_connection_attachments(c);
-  return (pconn*) pn_record_get(r, PN_PROACTOR);
+  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
 static void leader_error(psocket_t *ps, int err, const char* what) {
   if (ps->is_conn) {
-    pn_connection_engine_t *ceng = &as_pconn(ps)->ceng;
-    pn_connection_engine_errorf(ceng, COND_NAME, "%s %s:%s: %s",
+    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
                                 what, fixstr(ps->host), fixstr(ps->port),
                                 uv_strerror(err));
-    pn_connection_engine_bind(ceng);
-    pn_connection_engine_close(ceng);
+    pn_connection_driver_close(driver);
   } else {
     pn_listener_t *l = as_listener(ps);
     pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
@@ -376,9 +423,9 @@
   leader_count(ps->proactor, +1);
   int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
   if (!err) {
-    pconn *pc = as_pconn(ps);
+    pconnection_t *pc = as_pconnection_t(ps);
     if (pc) {
-      pc->connect.data = pc->write.data = pc->shutdown.data = ps;
+      pc->connect.data = ps;
       int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
       if (!err) {
         pc->timer.data = pc;
@@ -392,7 +439,7 @@
 }
 
 /* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconn *pc, int err, const char *what) {
+static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
   if (!err) {
     leader_to_worker(&pc->psocket);
   } else {
@@ -401,14 +448,14 @@
 }
 
 static void on_connect(uv_connect_t *connect, int err) {
-  leader_connect_accept((pconn*)connect->data, err, "on connect");
+  leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
 }
 
 static void on_accept(uv_stream_t* server, int err) {
   pn_listener_t* l = (pn_listener_t*)server->data;
   if (!err) {
     pn_rwbytes_t v =  pn_listener_get_extra(l);
-    pconn *pc = new_pconn(l->psocket.proactor, true,
+    pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
                           fixstr(l->psocket.host),
                           fixstr(l->psocket.port),
                           pn_bytes(v.size, v.start));
@@ -436,7 +483,7 @@
 }
 
 static void leader_connect(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
   uv_getaddrinfo_t info;
   int err = leader_resolve(ps, &info, false);
   if (!err) {
@@ -450,7 +497,7 @@
 
 static void leader_listen(psocket_t *ps) {
   pn_listener_t *l = as_listener(ps);
-  uv_getaddrinfo_t info;
+   uv_getaddrinfo_t info;
   int err = leader_resolve(ps, &info, true);
   if (!err) {
     err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
@@ -463,8 +510,8 @@
 }
 
 static void on_tick(uv_timer_t *timer) {
-  pconn *pc = (pconn*)timer->data;
-  pn_transport_t *t = pc->ceng.transport;
+  pconnection_t *pc = (pconnection_t*)timer->data;
+  pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
     uv_timer_stop(&pc->timer);
     uint64_t now = uv_now(pc->timer.loop);
@@ -476,24 +523,25 @@
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
-  pconn *pc = (pconn*)stream->data;
+  pconnection_t *pc = (pconnection_t*)stream->data;
   if (nread >= 0) {
-    pn_connection_engine_read_done(&pc->ceng, nread);
+    pn_connection_driver_read_done(&pc->driver, nread);
     on_tick(&pc->timer);         /* check for tick changes. */
     leader_to_worker(&pc->psocket);
     /* Reading continues automatically until stopped. */
   } else if (nread == UV_EOF) { /* hangup */
-    pn_connection_engine_read_close(&pc->ceng);
+    pn_connection_driver_read_close(&pc->driver);
     leader_maybe_free(&pc->psocket);
   } else {
     leader_error(&pc->psocket, nread, "on read from");
   }
 }
 
-static void on_write(uv_write_t* request, int err) {
-  pconn *pc = (pconn*)request->data;
+static void on_write(uv_write_t* write, int err) {
+  pconnection_t *pc = (pconnection_t*)write->data;
+  write->data = NULL;
   if (err == 0) {
-    pn_connection_engine_write_done(&pc->ceng, pc->writing);
+    pn_connection_driver_write_done(&pc->driver, pc->writing);
     leader_to_worker(&pc->psocket);
   } else if (err == UV_ECANCELED) {
     leader_maybe_free(&pc->psocket);
@@ -505,29 +553,31 @@
 
 // Read buffer allocation function for uv, just returns the transports read buffer.
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
-  pconn *pc = (pconn*)stream->data;
-  pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
+  pconnection_t *pc = (pconnection_t*)stream->data;
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
   *buf = uv_buf_init(rbuf.start, rbuf.size);
 }
 
 static void leader_rewatch(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
 
   if (pc->timer.data) {         /* uv-initialized */
     on_tick(&pc->timer);        /* Re-enable ticks if required */
   }
-  pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
-  pn_bytes_t wbuf = pn_connection_engine_write_buffer(&pc->ceng);
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
 
   /* Ticks and checking buffers can generate events, process before proceeding */
-  if (pn_connection_engine_has_event(&pc->ceng)) {
+  if (pn_connection_driver_has_event(&pc->driver)) {
     leader_to_worker(ps);
   } else {                      /* Re-watch for IO */
     if (wbuf.size > 0 && !pc->writing) {
       pc->writing = wbuf.size;
       uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+      pc->write.data = ps;
       uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-    } else if (wbuf.size == 0 && pn_connection_engine_write_closed(&pc->ceng)) {
+    } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+      pc->shutdown.data = ps;
       uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
     }
     if (rbuf.size > 0 && !pc->reading) {
@@ -537,23 +587,31 @@
   }
 }
 
-/* Return the next worker event or { 0 } if no events are ready */
-static pn_event_t* get_event_lh(pn_proactor_t *p) {
-  if (p->inactive) {
-    p->inactive = false;
-    return pn_collector_peek(p->inactive_event);
-  }
-  if (p->interrupt > 0) {
-    --p->interrupt;
-    return pn_collector_peek(p->interrupt_event);
+static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
+  pn_collector_put(p->collector, pn_proactor__class(), p, t);
+  p->batch_working = true;
+  return &p->batch;
+}
+
+/* Return the next event batch or 0 if no events are ready */
+static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+  if (!p->batch_working) {       /* Can generate proactor events */
+    if (p->inactive) {
+      p->inactive = false;
+      return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
+    }
+    if (p->interrupt > 0) {
+      --p->interrupt;
+      return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
+    }
   }
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
     if (ps->is_conn) {
-      pconn *pc = as_pconn(ps);
-      return pn_connection_engine_event(&pc->ceng);
+      pconnection_t *pc = as_pconnection_t(ps);
+      return &pc->driver.batch;
     } else {                    /* Listener */
       pn_listener_t *l = as_listener(ps);
-      return pn_collector_peek(l->collector);
+      return &l->batch;
     }
     to_leader(ps);      /* No event, back to leader */
   }
@@ -568,15 +626,6 @@
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
-/* Defer an action to the leader thread. Only from non-leader threads. */
-static void owner_defer(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  assert(!ps->action);
-  ps->action = action;
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
 pn_listener_t *pn_event_listener(pn_event_t *e) {
   return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
 }
@@ -590,57 +639,47 @@
   return NULL;
 }
 
-void pn_event_done(pn_event_t *e) {
-  pn_event_type_t etype = pn_event_type(e);
-  pconn *pc = get_pconn(pn_event_connection(e));
-  if (pc && e == pn_collector_peek(pc->ceng.collector)) {
-    pn_connection_engine_pop_event(&pc->ceng);
-    if (etype == PN_CONNECTION_INIT) {
-      /* Bind after user has handled CONNECTION_INIT */
-      pn_connection_engine_bind(&pc->ceng);
-    }
-    if (pn_connection_engine_has_event(&pc->ceng)) {
-      /* Process all events before going back to IO.
-         Put it back on the worker queue and wake the leader.
-      */
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) {
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      /* Process all events before going back to IO. */
       worker_requeue(&pc->psocket);
-    } else if (pn_connection_engine_finished(&pc->ceng)) {
-      owner_defer(&pc->psocket, leader_close);
+    } else if (pn_connection_driver_finished(&pc->driver)) {
+      owner_to_leader(&pc->psocket, leader_close);
     } else {
-      owner_defer(&pc->psocket, leader_rewatch);
+      owner_to_leader(&pc->psocket, leader_rewatch);
     }
-  } else {
-    pn_listener_t *l = pn_event_listener(e);
-    if (l && e == pn_collector_peek(l->collector)) {
-      pn_collector_pop(l->collector);
-      if (etype == PN_LISTENER_CLOSE) {
-        owner_defer(&l->psocket, leader_close);
-      }
-    }
+    return;
   }
+  pn_proactor_t *bp = batch_proactor(batch);
+  if (bp == p) {
+    uv_mutex_lock(&p->lock);
+    p->batch_working = false;
+    uv_async_send(&p->async); /* Wake leader */
+    uv_mutex_unlock(&p->lock);
+    return;
+  }
+  /* Nothing extra to do for listener, it is always in the UV loop. */
 }
 
 /* Run follower/leader loop till we can return an event and be a worker */
-pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   uv_mutex_lock(&p->lock);
   /* Try to grab work immediately. */
-  pn_event_t *e = get_event_lh(p);
-  if (e == NULL) {
+  pn_event_batch_t *batch = get_batch_lh(p);
+  if (batch == NULL) {
     /* No work available, follow the leader */
-    while (p->has_leader)
+    while (p->has_leader) {
       uv_cond_wait(&p->cond, &p->lock);
+    }
     /* Lead till there is work to do. */
     p->has_leader = true;
-    for (e = get_event_lh(p); e == NULL; e = get_event_lh(p)) {
-      /* Run uv_loop outside the lock */
-      uv_mutex_unlock(&p->lock);
-      uv_run(&p->loop, UV_RUN_ONCE);
-      uv_mutex_lock(&p->lock);
-      /* Process leader work queue outside the lock */
+    while (batch == NULL) {
       for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
         void (*action)(psocket_t*) = ps->action;
-        ps->action = NULL;
         void (*wakeup)(psocket_t*) = ps->wakeup;
+        ps->action = NULL;
         ps->wakeup = NULL;
         if (action || wakeup) {
           uv_mutex_unlock(&p->lock);
@@ -649,13 +688,19 @@
           uv_mutex_lock(&p->lock);
         }
       }
+      batch = get_batch_lh(p);
+      if (batch == NULL) {
+        uv_mutex_unlock(&p->lock);
+        uv_run(&p->loop, UV_RUN_ONCE);
+        uv_mutex_lock(&p->lock);
+      }
     }
     /* Signal the next leader and return to work */
     p->has_leader = false;
     uv_cond_signal(&p->cond);
   }
   uv_mutex_unlock(&p->lock);
-  return e;
+  return batch;
 }
 
 void pn_proactor_interrupt(pn_proactor_t *p) {
@@ -666,11 +711,12 @@
 }
 
 int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
-  pconn *pc = new_pconn(p, false, host, port, extra);
+  pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  owner_defer(&pc->psocket, leader_connect); /* Process PN_CONNECTION_INIT before binding */
+  /* Process PN_CONNECTION_INIT before binding */
+  owner_to_worker(&pc->psocket, leader_connect);
   return 0;
 }
 
@@ -678,12 +724,12 @@
 
 pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
   pn_listener_t *l = new_listener(p, host, port, backlog, extra);
-  if (l)  owner_defer(&l->psocket, leader_listen);
+  if (l)  owner_to_leader(&l->psocket, leader_listen);
   return l;
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
-  pconn *pc = get_pconn(c);
+  pconnection_t *pc = get_pconnection_t(c);
   return pc ? pc->psocket.proactor : NULL;
 }
 
@@ -692,13 +738,14 @@
 }
 
 void leader_wake_connection(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
-  pn_collector_put(pc->ceng.collector, PN_OBJECT, pc->ceng.connection, PN_CONNECTION_WAKE);
+  pconnection_t *pc = as_pconnection_t(ps);
+  pn_connection_t *c = pc->driver.connection;
+  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
   leader_to_worker(ps);
 }
 
 void pn_connection_wake(pn_connection_t* c) {
-  wakeup(&get_pconn(c)->psocket, leader_wake_connection);
+  wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
 }
 
 void pn_listener_close(pn_listener_t* l) {
@@ -710,22 +757,15 @@
   return l->condition;
 }
 
-/* Collector to hold for a single fixed event that is never popped. */
-static pn_collector_t *event_holder(pn_proactor_t *p, pn_event_type_t t) {
-  pn_collector_t *c = pn_collector();
-  pn_collector_put(c, pn_proactor__class(), p, t);
-  return c;
-}
-
 pn_proactor_t *pn_proactor() {
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+  p->collector = pn_collector();
+  p->batch.next_event = &proactor_batch_next;
+  if (!p->collector) return NULL;
   uv_loop_init(&p->loop);
   uv_mutex_init(&p->lock);
   uv_cond_init(&p->cond);
   uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
-  p->interrupt_event = event_holder(p, PN_PROACTOR_INTERRUPT);
-  p->inactive_event = event_holder(p, PN_PROACTOR_INACTIVE);
-  p->timeout_event = event_holder(p, PN_PROACTOR_TIMEOUT);
   return p;
 }
 
@@ -741,8 +781,19 @@
   uv_loop_close(&p->loop);
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
-  pn_collector_free(p->interrupt_event);
-  pn_collector_free(p->inactive_event);
-  pn_collector_free(p->timeout_event);
+  pn_collector_free(p->collector);
   free(p);
 }
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+  pn_listener_t *l = batch_listener(batch);
+  pn_event_t *handled = pn_collector_prev(l->collector);
+  if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
+    owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
+  }
+  return pn_collector_next(l->collector);
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+  return pn_collector_next(batch_proactor(batch)->collector);
+}
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index acdae0c..88e3456 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -20,7 +20,7 @@
  */
 
 #include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/delivery.h>
 #include <proton/proactor.h>
 #include <proton/link.h>
@@ -37,12 +37,13 @@
 typedef char str[1024];
 
 typedef struct app_data_t {
-    str address;
-    str container_id;
-    pn_rwbytes_t message_buffer;
-    int message_count;
-    int received;
-    pn_proactor_t *proactor;
+  str address;
+  str container_id;
+  pn_rwbytes_t message_buffer;
+  int message_count;
+  int received;
+  pn_proactor_t *proactor;
+  bool finished;
 } app_data_t;
 
 static const int BATCH = 100; /* Batch size for unlimited receive */
@@ -80,9 +81,7 @@
   }
 }
 
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_CONNECTION_INIT: {
@@ -149,53 +148,58 @@
     break;
 
    case PN_PROACTOR_INACTIVE:
-    more = false;
+    app->finished = true;
     break;
 
    default: break;
   }
-  pn_event_done(event);
-  return more;
 }
 
 static void usage(const char *arg0) {
-    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
-    exit(1);
+  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+  exit(1);
 }
 
 int main(int argc, char **argv) {
-    /* Default values for application and connection. */
-    app_data_t app = {{0}};
-    app.message_count = 100;
-    const char* urlstr = NULL;
+  /* Default values for application and connection. */
+  app_data_t app = {{0}};
+  app.message_count = 100;
+  const char* urlstr = NULL;
 
-    int opt;
-    while((opt = getopt(argc, argv, "a:m:")) != -1) {
-        switch(opt) {
-          case 'a': urlstr = optarg; break;
-          case 'm': app.message_count = atoi(optarg); break;
-          default: usage(argv[0]); break;
-        }
+  int opt;
+  while((opt = getopt(argc, argv, "a:m:")) != -1) {
+    switch(opt) {
+     case 'a': urlstr = optarg; break;
+     case 'm': app.message_count = atoi(optarg); break;
+     default: usage(argv[0]); break;
     }
-    if (optind < argc)
-        usage(argv[0]);
+  }
+  if (optind < argc)
+    usage(argv[0]);
 
-    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+  snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
 
-    /* Parse the URL or use default values */
-    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-    const char *host = url ? pn_url_get_host(url) : NULL;
-    const char *port = url ? pn_url_get_port(url) : "amqp";
-    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+  /* Parse the URL or use default values */
+  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+  const char *host = url ? pn_url_get_host(url) : NULL;
+  const char *port = url ? pn_url_get_port(url) : "amqp";
+  strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
 
-    /* Create the proactor and connect */
-    app.proactor = pn_proactor();
-    pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
-    if (url) pn_url_free(url);
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+  if (url) pn_url_free(url);
 
-    while (handle(&app, pn_proactor_wait(app.proactor)))
-           ;
-    pn_proactor_free(app.proactor);
-    free(app.message_buffer.start);
-    return exit_code;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(&app, e);
+    }
+    pn_proactor_done(app.proactor, events);
+  } while(!app.finished);
+
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
 }
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 5d58895..d64ea2d 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -20,7 +20,7 @@
  */
 
 #include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/delivery.h>
 #include <proton/proactor.h>
 #include <proton/link.h>
@@ -37,13 +37,14 @@
 typedef char str[1024];
 
 typedef struct app_data_t {
-    str address;
-    str container_id;
-    pn_rwbytes_t message_buffer;
-    int message_count;
-    int sent;
-    int acknowledged;
-    pn_proactor_t *proactor;
+  str address;
+  str container_id;
+  pn_rwbytes_t message_buffer;
+  int message_count;
+  int sent;
+  int acknowledged;
+  pn_proactor_t *proactor;
+  bool finished;
 } app_data_t;
 
 int exit_code = 0;
@@ -58,41 +59,39 @@
 
 /* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
 static pn_bytes_t encode_message(app_data_t* app) {
-    /* Construct a message with the map { "sequence": app.sent } */
-    pn_message_t* message = pn_message();
-    pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-    pn_data_t* body = pn_message_body(message);
-    pn_data_put_map(body);
-    pn_data_enter(body);
-    pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-    pn_data_put_int(body, app->sent); /* The sequence number */
-    pn_data_exit(body);
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
 
-    /* encode the message, expanding the encode buffer as needed */
-    if (app->message_buffer.start == NULL) {
-        static const size_t initial_size = 128;
-        app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-    }
-    /* app->message_buffer is the total buffer space available. */
-    /* mbuf wil point at just the portion used by the encoded message */
-    pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-    int status = 0;
-    while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-        app->message_buffer.size *= 2;
-        app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-        mbuf.size = app->message_buffer.size;
-    }
-    if (status != 0) {
-        fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-        exit(1);
-    }
-    pn_message_free(message);
-    return pn_bytes(mbuf.size, mbuf.start);
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
 }
 
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_CONNECTION_INIT: {
@@ -130,7 +129,7 @@
      }
    } break;
 
-   case PN_TRANSPORT_ERROR:
+   case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     break;
 
@@ -151,53 +150,58 @@
     break;
 
    case PN_PROACTOR_INACTIVE:
-    more = false;
+    app->finished = true;
     break;
 
    default: break;
   }
-  pn_event_done(event);
-  return more;
 }
 
 static void usage(const char *arg0) {
-    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
-    exit(1);
+  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+  exit(1);
 }
 
 int main(int argc, char **argv) {
-    /* Default values for application and connection. */
-    app_data_t app = {{0}};
-    app.message_count = 100;
-    const char* urlstr = NULL;
+  /* Default values for application and connection. */
+  app_data_t app = {{0}};
+  app.message_count = 100;
+  const char* urlstr = NULL;
 
-    int opt;
-    while((opt = getopt(argc, argv, "a:m:")) != -1) {
-        switch(opt) {
-          case 'a': urlstr = optarg; break;
-          case 'm': app.message_count = atoi(optarg); break;
-          default: usage(argv[0]); break;
-        }
+  int opt;
+  while((opt = getopt(argc, argv, "a:m:")) != -1) {
+    switch(opt) {
+     case 'a': urlstr = optarg; break;
+     case 'm': app.message_count = atoi(optarg); break;
+     default: usage(argv[0]); break;
     }
-    if (optind < argc)
-        usage(argv[0]);
+  }
+  if (optind < argc)
+    usage(argv[0]);
 
-    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+  snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
 
-    /* Parse the URL or use default values */
-    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-    const char *host = url ? pn_url_get_host(url) : NULL;
-    const char *port = url ? pn_url_get_port(url) : "amqp";
-    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+  /* Parse the URL or use default values */
+  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+  const char *host = url ? pn_url_get_host(url) : NULL;
+  const char *port = url ? pn_url_get_port(url) : "amqp";
+  strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
 
-    /* Create the proactor and connect */
-    app.proactor = pn_proactor();
-    pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
-    if (url) pn_url_free(url);
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+  if (url) pn_url_free(url);
 
-    while (handle(&app, pn_proactor_wait(app.proactor)))
-           ;
-    pn_proactor_free(app.proactor);
-    free(app.message_buffer.start);
-    return exit_code;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(&app, e);
+    }
+    pn_proactor_done(app.proactor, events);
+  } while(!app.finished);
+
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
 }
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1d46ec8..447d3ad 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -123,7 +123,7 @@
 /** @example mt/epoll_container.cpp
 
 An example implementation of the proton::container API that shows how
-to use the proton::io::connection_engine SPI to adapt the proton API
+to use the proton::io::connection_driver SPI to adapt the proton API
 to native IO, in this case using a multithreaded Linux epoll poller as
 the implementation.
 
diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp
index d9b9f08..7646673 100644
--- a/examples/cpp/mt/epoll_container.cpp
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -25,7 +25,7 @@
 #include <proton/url.hpp>
 
 #include <proton/io/container_impl_base.hpp>
-#include <proton/io/connection_engine.hpp>
+#include <proton/io/connection_driver.hpp>
 #include <proton/io/link_namer.hpp>
 
 #include <atomic>
@@ -97,7 +97,7 @@
 };
 
 class pollable;
-class pollable_engine;
+class pollable_driver;
 class pollable_listener;
 
 class epoll_container : public proton::io::container_impl_base {
@@ -124,7 +124,7 @@
     std::string id() const OVERRIDE { return id_; }
 
     // Functions used internally.
-    proton::connection add_engine(proton::connection_options opts, int fd, bool server);
+    proton::connection add_driver(proton::connection_options opts, int fd, bool server);
     void erase(pollable*);
 
     // Link names must be unique per container.
@@ -160,7 +160,7 @@
 
     proton::connection_options options_;
     std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
-    std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+    std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_;
 
     std::condition_variable stopped_;
     bool stopping_;
@@ -274,21 +274,21 @@
     bool closed_;
 };
 
-// Handle epoll wakeups for a connection_engine.
-class pollable_engine : public pollable {
+// Handle epoll wakeups for a connection_driver.
+class pollable_driver : public pollable {
   public:
-    pollable_engine(epoll_container& c, int fd, int epoll_fd) :
+    pollable_driver(epoll_container& c, int fd, int epoll_fd) :
         pollable(fd, epoll_fd),
         loop_(new epoll_event_loop(*this)),
-        engine_(c, loop_)
+        driver_(c, loop_)
     {
-        proton::connection conn = engine_.connection();
+        proton::connection conn = driver_.connection();
         proton::io::set_link_namer(conn, c.link_namer);
     }
 
-    ~pollable_engine() {
+    ~pollable_driver() {
         loop_->close();                // No calls to notify() after this.
-        engine_.dispatch();            // Run any final events.
+        driver_.dispatch();            // Run any final events.
         try { write(); } catch(...) {} // Write connection close if we can.
         for (auto f : loop_->pop_all()) {// Run final queued work for side-effects.
             try { f(); } catch(...) {}
@@ -303,17 +303,17 @@
                 can_read = can_read && read();
                 for (auto f : loop_->pop_all()) // Run queued work
                     f();
-                engine_.dispatch();
+                driver_.dispatch();
             } while (can_read || can_write);
-            return (engine_.read_buffer().size ? EPOLLIN:0) |
-                (engine_.write_buffer().size ? EPOLLOUT:0);
+            return (driver_.read_buffer().size ? EPOLLIN:0) |
+                (driver_.write_buffer().size ? EPOLLOUT:0);
         } catch (const std::exception& e) {
-            engine_.disconnected(proton::error_condition("exception", e.what()));
+            driver_.disconnected(proton::error_condition("exception", e.what()));
         }
         return 0;               // Ending
     }
 
-    proton::io::connection_engine& engine() { return engine_; }
+    proton::io::connection_driver& driver() { return driver_; }
 
   private:
     static bool try_again(int e) {
@@ -322,11 +322,11 @@
     }
 
     bool write() {
-        proton::io::const_buffer wbuf(engine_.write_buffer());
+        proton::io::const_buffer wbuf(driver_.write_buffer());
         if (wbuf.size) {
             ssize_t n = ::write(fd_, wbuf.data, wbuf.size);
             if (n > 0) {
-                engine_.write_done(n);
+                driver_.write_done(n);
                 return true;
             } else if (n < 0 && !try_again(errno)) {
                 check(n, "write");
@@ -336,15 +336,15 @@
     }
 
     bool read() {
-        proton::io::mutable_buffer rbuf(engine_.read_buffer());
+        proton::io::mutable_buffer rbuf(driver_.read_buffer());
         if (rbuf.size) {
             ssize_t n = ::read(fd_, rbuf.data, rbuf.size);
             if (n > 0) {
-                engine_.read_done(n);
+                driver_.read_done(n);
                 return true;
             }
             else if (n == 0)
-                engine_.read_close();
+                driver_.read_close();
             else if (!try_again(errno))
                 check(n, "read");
         }
@@ -352,13 +352,13 @@
     }
 
     // Lifecycle note: loop_ belongs to the proton::connection, which can live
-    // longer than the engine if the application holds a reference to it, we
-    // disconnect ourselves with loop_->close() in ~connection_engine()
+    // longer than the driver if the application holds a reference to it, we
+    // disconnect ourselves with loop_->close() in ~connection_driver()
     epoll_event_loop* loop_;
-    proton::io::connection_engine engine_;
+    proton::io::connection_driver driver_;
 };
 
-// A pollable listener fd that creates pollable_engine for incoming connections.
+// A pollable listener fd that creates pollable_driver for incoming connections.
 class pollable_listener : public pollable {
   public:
     pollable_listener(
@@ -380,7 +380,7 @@
         }
         try {
             int accepted = check(::accept(fd_, NULL, 0), "accept");
-            container_.add_engine(listener_.on_accept(), accepted, true);
+            container_.add_driver(listener_.on_accept(), accepted, true);
             return EPOLLIN;
         } catch (const std::exception& e) {
             listener_.on_error(e.what());
@@ -424,25 +424,25 @@
     } catch (...) {}
 }
 
-proton::connection epoll_container::add_engine(proton::connection_options opts, int fd, bool server)
+proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server)
 {
     lock_guard g(lock_);
     if (stopping_)
         throw proton::error("container is stopping");
-    std::unique_ptr<pollable_engine> eng(new pollable_engine(*this, fd, epoll_fd_));
+    std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_));
     if (server)
-        eng->engine().accept(opts);
+        eng->driver().accept(opts);
     else
-        eng->engine().connect(opts);
-    proton::connection c = eng->engine().connection();
+        eng->driver().connect(opts);
+    proton::connection c = eng->driver().connection();
     eng->notify();
-    engines_[eng.get()] = std::move(eng);
+    drivers_[eng.get()] = std::move(eng);
     return c;
 }
 
 void epoll_container::erase(pollable* e) {
     lock_guard g(lock_);
-    if (!engines_.erase(e)) {
+    if (!drivers_.erase(e)) {
         pollable_listener* l = dynamic_cast<pollable_listener*>(e);
         if (l)
             listeners_.erase(l->addr());
@@ -451,7 +451,7 @@
 }
 
 void epoll_container::idle_check(const lock_guard&) {
-    if (stopping_  && engines_.empty() && listeners_.empty())
+    if (stopping_  && drivers_.empty() && listeners_.empty())
         interrupt();
 }
 
@@ -462,7 +462,7 @@
     unique_addrinfo ainfo(addr);
     unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
     check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
-    return make_thread_safe(add_engine(opts, fd.release(), false));
+    return make_thread_safe(add_driver(opts, fd.release(), false));
 }
 
 proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) {
@@ -520,10 +520,10 @@
 void epoll_container::wait() {
     std::unique_lock<std::mutex> l(lock_);
     stopped_.wait(l, [this]() { return this->threads_ == 0; } );
-    for (auto& eng : engines_)
-        eng.second->engine().disconnected(stop_err_);
+    for (auto& eng : drivers_)
+        eng.second->driver().disconnected(stop_err_);
     listeners_.clear();
-    engines_.clear();
+    drivers_.clear();
 }
 
 void epoll_container::interrupt() {
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index ddab147..ffc6e10 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -369,7 +369,7 @@
   src/core/encoder.c
 
   src/core/dispatcher.c
-  src/core/connection_engine.c
+  src/core/connection_driver.c
   src/core/engine.c
   src/core/event.c
   src/core/autodetect.c
@@ -440,7 +440,7 @@
   include/proton/codec.h
   include/proton/condition.h
   include/proton/connection.h
-  include/proton/connection_engine.h
+  include/proton/connection_driver.h
   include/proton/delivery.h
   include/proton/disposition.h
   include/proton/engine.h
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index ed969eb..6af4319 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -48,7 +48,7 @@
   src/error_condition.cpp
   src/event_loop.cpp
   src/handler.cpp
-  src/io/connection_engine.cpp
+  src/io/connection_driver.cpp
   src/io/link_namer.cpp
   src/link.cpp
   src/listener.cpp
diff --git a/proton-c/bindings/cpp/docs/io.md b/proton-c/bindings/cpp/docs/io.md
index a892e61..230e538 100644
--- a/proton-c/bindings/cpp/docs/io.md
+++ b/proton-c/bindings/cpp/docs/io.md
@@ -7,16 +7,16 @@
 that allows you to implement the Proton API over alternative IO or
 threading libraries.
 
-The `proton::io::connection_engine` class converts an AMQP-encoded
+The `proton::io::connection_driver` class converts an AMQP-encoded
 byte stream, read from any IO source, into `proton::messaging_handler`
 calls. It generates an AMQP-encoded byte stream as output that can be
 written to any IO destination.
 
-The connection engine is deliberately very simple and low level. It
+The connection driver is deliberately very simple and low level. It
 performs no IO of its own, no thread-related locking, and is written
 in simple C++98-compatible code.
 
-The connection engine can be used standalone as an AMQP translator, or
+The connection dirver can be used standalone as an AMQP translator, or
 you can implement the following two interfaces to provide a complete
 implementation of the Proton API that can run any Proton application:
 
diff --git a/proton-c/bindings/cpp/docs/main.md b/proton-c/bindings/cpp/docs/main.md
index 011df29..93ba2c0 100644
--- a/proton-c/bindings/cpp/docs/main.md
+++ b/proton-c/bindings/cpp/docs/main.md
@@ -123,6 +123,6 @@
 `proton::default_container`.
 
 You can implement your own container to integrate proton with any IO
-provider using the `proton::io::connection_engine`.
+provider using the `proton::io::connection_driver`.
 
 @see @ref io_page
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 9fbdbdc..d2deebf 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,7 +40,7 @@
 class connection;
 
 namespace io {
-class connection_engine;
+class connection_driver;
 }
 
 /// Options for creating a connection.
@@ -163,7 +163,7 @@
     /// @cond INTERNAL
   friend class container_impl;
   friend class connector;
-  friend class io::connection_engine;
+  friend class io::connection_driver;
   friend class connection;
     /// @endcond
 };
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
similarity index 87%
rename from proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
rename to proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index d9825c2..d5da718 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_IO_CONNECTION_ENGINE_HPP
-#define PROTON_IO_CONNECTION_ENGINE_HPP
+#ifndef PROTON_IO_CONNECTION_DRIVER_HPP
+#define PROTON_IO_CONNECTION_DRIVER_HPP
 
 /*
  *
@@ -32,7 +32,7 @@
 #include "../transport.hpp"
 #include "../types.hpp"
 
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 
 #include <cstddef>
 #include <utility>
@@ -63,13 +63,9 @@
     const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
 };
 
-/// **Experimental** - An AMQP protocol engine for a single
-/// connection.
+/// **Experimental** - An AMQP driver for a single connection.
 ///
-/// A connection_engine is a protocol engine that integrates AMQP into
-/// any IO or concurrency framework.
-///
-/// io::connection_engine manages a single proton::connection and dispatches
+/// io::connection_driver manages a single proton::connection and dispatches
 /// events to a proton::messaging_handler. It does no IO of its own, but allows you to
 /// integrate AMQP protocol handling into any IO or concurrency framework.
 ///
@@ -78,7 +74,7 @@
 /// proton::messaging_handler to respond to transport, connection,
 /// session, link, and message events. With a little care, the same
 /// handler classes can be used for both container and
-/// connection_engine. the @ref broker.cpp example illustrates this.
+/// connection_driver. the @ref broker.cpp example illustrates this.
 ///
 /// You need to write the IO code to read AMQP data to the
 /// read_buffer(). The engine parses the AMQP frames. dispatch() calls
@@ -96,7 +92,7 @@
 ///
 /// The engine never throws exceptions.
 class
-PN_CPP_CLASS_EXTERN connection_engine {
+PN_CPP_CLASS_EXTERN connection_driver {
   public:
     /// An engine that is not associated with a proton::container or
     /// proton::event_loop.
@@ -104,20 +100,20 @@
     /// Accessing the container or event_loop for this connection in
     /// a proton::messaging_handler will throw a proton::error exception.
     ///
-    PN_CPP_EXTERN connection_engine();
+    PN_CPP_EXTERN connection_driver();
 
-    /// Create a connection engine associated with a proton::container and
+    /// Create a connection driver associated with a proton::container and
     /// optional event_loop. If the event_loop is not provided attempts to use
     /// it will throw proton::error.
     ///
     /// Takes ownership of the event_loop. Note the proton::connection created
-    /// by this connection_engine can outlive the connection_engine itself if
+    /// by this connection_driver can outlive the connection_driver itself if
     /// the user pins it in memory using the proton::thread_safe<> template.
     /// The event_loop is deleted when, and only when, the proton::connection is.
     ///
-    PN_CPP_EXTERN connection_engine(proton::container&, event_loop* loop = 0);
+    PN_CPP_EXTERN connection_driver(proton::container&, event_loop* loop = 0);
 
-    PN_CPP_EXTERN ~connection_engine();
+    PN_CPP_EXTERN ~connection_driver();
 
     /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
     /// Does not apply any default options, to apply container defaults use connect() or accept()
@@ -189,27 +185,27 @@
     ///
     PN_CPP_EXTERN bool dispatch();
 
-    /// Get the AMQP connection associated with this connection_engine.
+    /// Get the AMQP connection associated with this connection_driver.
     /// The event_loop is availabe via proton::thread_safe<connection>(connection())
     PN_CPP_EXTERN proton::connection connection() const;
 
-    /// Get the transport associated with this connection_engine.
+    /// Get the transport associated with this connection_driver.
     PN_CPP_EXTERN proton::transport transport() const;
 
-    /// Get the container associated with this connection_engine, if there is one.
+    /// Get the container associated with this connection_driver, if there is one.
     PN_CPP_EXTERN proton::container* container() const;
 
  private:
     void init();
-    connection_engine(const connection_engine&);
-    connection_engine& operator=(const connection_engine&);
+    connection_driver(const connection_driver&);
+    connection_driver& operator=(const connection_driver&);
 
     messaging_handler* handler_;
     proton::container* container_;
-    pn_connection_engine_t engine_;
+    pn_connection_driver_t driver_;
 };
 
 } // io
 } // proton
 
-#endif // PROTON_IO_CONNECTION_ENGINE_HPP
+#endif // PROTON_IO_CONNECTION_DRIVER_HPP
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index 2c5423f..acdcd30 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -40,7 +40,7 @@
 class messaging_adapter;
 
 namespace io {
-class connection_engine;
+class connection_driver;
 }
 
 /// A handler for Proton messaging events.
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index bcd8a2f..10641e0 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -35,7 +35,7 @@
 class sasl;
 
 namespace io {
-class connection_engine;
+class connection_driver;
 }
 
 /// A network channel supporting an AMQP connection.
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 6c3341f..991836d 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -24,7 +24,7 @@
 
 #include "proton/container.hpp"
 #include "proton/uuid.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
 #include "proton/io/link_namer.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/types_fwd.hpp"
@@ -37,7 +37,7 @@
 using namespace std;
 using namespace proton;
 
-using proton::io::connection_engine;
+using proton::io::connection_driver;
 using proton::io::const_buffer;
 using proton::io::mutable_buffer;
 
@@ -45,14 +45,14 @@
 
 typedef std::deque<char> byte_stream;
 
-/// In memory connection_engine that reads and writes from byte_streams
-struct in_memory_engine : public connection_engine {
+/// In memory connection_driver that reads and writes from byte_streams
+struct in_memory_engine : public connection_driver {
 
     byte_stream& reads;
     byte_stream& writes;
 
     in_memory_engine(byte_stream& rd, byte_stream& wr, class container& cont) :
-        connection_engine(cont), reads(rd), writes(wr) {}
+        connection_driver(cont), reads(rd), writes(wr) {}
 
     void do_read() {
         mutable_buffer rbuf = read_buffer();
@@ -247,7 +247,7 @@
 
 void test_no_container() {
     // An engine with no container should throw, not crash.
-    connection_engine e;
+    connection_driver e;
     try {
         e.connection().container();
         FAIL("expected error");
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 74a763c..1d4194e 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -24,7 +24,7 @@
 
 #include "proton/connection.hpp"
 #include "proton/container.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
 #include "proton/event_loop.hpp"
 #include "proton/listen_handler.hpp"
 #include "proton/message.hpp"
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp
new file mode 100644
index 0000000..06b01d8
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_driver.hpp"
+
+#include "proton/event_loop.hpp"
+#include "proton/error.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "messaging_adapter.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_event.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+
+namespace proton {
+namespace io {
+
+void connection_driver::init() {
+    if (pn_connection_driver_init(&driver_, pn_connection(), pn_transport()) != 0) {
+        this->~connection_driver(); // Dtor won't be called on throw from ctor.
+        throw proton::error(std::string("connection_driver allocation failed"));
+    }
+}
+
+connection_driver::connection_driver() : handler_(0), container_(0) { init(); }
+
+connection_driver::connection_driver(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
+    init();
+    connection_context& ctx = connection_context::get(connection());
+    ctx.container = container_;
+    ctx.event_loop.reset(loop);
+}
+
+connection_driver::~connection_driver() {
+    pn_connection_driver_destroy(&driver_);
+}
+
+// FIXME aconway 2016-11-16: rename _engine > _driver
+void connection_driver::configure(const connection_options& opts, bool server) {
+    proton::connection c(connection());
+    opts.apply_unbound(c);
+    if (server) pn_transport_set_server(driver_.transport);
+    pn_connection_driver_bind(&driver_);
+    opts.apply_bound(c);
+    handler_ =  opts.handler();
+    connection_context::get(connection()).collector =
+      pn_connection_collector(driver_.connection);
+}
+
+void connection_driver::connect(const connection_options& opts) {
+    connection_options all;
+    if (container_) {
+        all.container_id(container_->id());
+        all.update(container_->client_connection_options());
+    }
+    all.update(opts);
+    configure(all, false);
+    connection().open();
+}
+
+void connection_driver::accept(const connection_options& opts) {
+    connection_options all;
+    if (container_) {
+        all.container_id(container_->id());
+        all.update(container_->server_connection_options());
+    }
+    all.update(opts);
+    configure(all, true);
+}
+
+bool connection_driver::dispatch() {
+    pn_event_t* c_event;
+    while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
+        proton_event cpp_event(c_event, container_);
+        try {
+            if (handler_ != 0) {
+                messaging_adapter adapter(*handler_);
+                cpp_event.dispatch(adapter);
+            }
+        } catch (const std::exception& e) {
+            pn_condition_t *cond = pn_transport_condition(driver_.transport);
+            if (!pn_condition_is_set(cond)) {
+                pn_condition_format(cond, "exception", "%s", e.what());
+            }
+        }
+    }
+    return !pn_connection_driver_finished(&driver_);
+}
+
+mutable_buffer connection_driver::read_buffer() {
+    pn_rwbytes_t buffer = pn_connection_driver_read_buffer(&driver_);
+    return mutable_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::read_done(size_t n) {
+    return pn_connection_driver_read_done(&driver_, n);
+}
+
+void connection_driver::read_close() {
+    pn_connection_driver_read_close(&driver_);
+}
+
+const_buffer connection_driver::write_buffer() {
+    pn_bytes_t buffer = pn_connection_driver_write_buffer(&driver_);
+    return const_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::write_done(size_t n) {
+    return pn_connection_driver_write_done(&driver_, n);
+}
+
+void connection_driver::write_close() {
+    pn_connection_driver_write_close(&driver_);
+}
+
+void connection_driver::disconnected(const proton::error_condition& err) {
+    pn_condition_t* condition = pn_transport_condition(driver_.transport);
+    if (!pn_condition_is_set(condition))  {
+        set_error_condition(err, condition);
+    }
+    pn_connection_driver_close(&driver_);
+}
+
+proton::connection connection_driver::connection() const {
+    return make_wrapper(driver_.connection);
+}
+
+proton::transport connection_driver::transport() const {
+    return make_wrapper(driver_.transport);
+}
+
+proton::container* connection_driver::container() const {
+    return container_;
+}
+
+}}
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
deleted file mode 100644
index 5e6483f..0000000
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/io/connection_engine.hpp"
-
-#include "proton/event_loop.hpp"
-#include "proton/error.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "messaging_adapter.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_event.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-
-namespace proton {
-namespace io {
-
-void connection_engine::init() {
-    if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
-        this->~connection_engine(); // Dtor won't be called on throw from ctor.
-        throw proton::error(std::string("connection_engine allocation failed"));
-    }
-}
-
-connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
-
-connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
-    init();
-    connection_context& ctx = connection_context::get(connection());
-    ctx.container = container_;
-    ctx.event_loop.reset(loop);
-}
-
-connection_engine::~connection_engine() {
-    pn_connection_engine_destroy(&engine_);
-}
-
-void connection_engine::configure(const connection_options& opts, bool server) {
-    proton::connection c(connection());
-    opts.apply_unbound(c);
-    if (server) pn_transport_set_server(engine_.transport);
-    pn_connection_engine_bind(&engine_);
-    opts.apply_bound(c);
-    handler_ =  opts.handler();
-    connection_context::get(connection()).collector = engine_.collector;
-}
-
-void connection_engine::connect(const connection_options& opts) {
-    connection_options all;
-    if (container_) {
-        all.container_id(container_->id());
-        all.update(container_->client_connection_options());
-    }
-    all.update(opts);
-    configure(all, false);
-    connection().open();
-}
-
-void connection_engine::accept(const connection_options& opts) {
-    connection_options all;
-    if (container_) {
-        all.container_id(container_->id());
-        all.update(container_->server_connection_options());
-    }
-    all.update(opts);
-    configure(all, true);
-}
-
-bool connection_engine::dispatch() {
-    pn_event_t* c_event;
-    while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
-        proton_event cpp_event(c_event, container_);
-        try {
-            if (handler_ != 0) {
-                messaging_adapter adapter(*handler_);
-                cpp_event.dispatch(adapter);
-            }
-        } catch (const std::exception& e) {
-            pn_condition_t *cond = pn_transport_condition(engine_.transport);
-            if (!pn_condition_is_set(cond)) {
-                pn_condition_format(cond, "exception", "%s", e.what());
-            }
-        }
-        pn_connection_engine_pop_event(&engine_);
-    }
-    return !pn_connection_engine_finished(&engine_);
-}
-
-mutable_buffer connection_engine::read_buffer() {
-    pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
-    return mutable_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::read_done(size_t n) {
-    return pn_connection_engine_read_done(&engine_, n);
-}
-
-void connection_engine::read_close() {
-    pn_connection_engine_read_close(&engine_);
-}
-
-const_buffer connection_engine::write_buffer() {
-    pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
-    return const_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::write_done(size_t n) {
-    return pn_connection_engine_write_done(&engine_, n);
-}
-
-void connection_engine::write_close() {
-    pn_connection_engine_write_close(&engine_);
-}
-
-void connection_engine::disconnected(const proton::error_condition& err) {
-    pn_condition_t* condition = pn_transport_condition(engine_.transport);
-    if (!pn_condition_is_set(condition))  {
-        set_error_condition(err, condition);
-    }
-    pn_connection_engine_close(&engine_);
-}
-
-proton::connection connection_engine::connection() const {
-    return make_wrapper(engine_.connection);
-}
-
-proton::transport connection_engine::transport() const {
-    return make_wrapper(engine_.transport);
-}
-
-proton::container* connection_engine::container() const {
-    return container_;
-}
-
-}}
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index b84722c..e5ec55a 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -74,7 +74,7 @@
             // Create dummy flow event where "drain finish" can be detected.
             pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object()));
             connection_context& cctx = connection_context::get(pnc);
-            // connection_engine collector is per connection.  Reactor collector is global.
+            // connection_driver collector is per connection.  Reactor collector is global.
             pn_collector_t *coll = cctx.collector;
             if (!coll)
                 coll = pn_reactor_collector(pn_object_reactor(pnc));
diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp
index f8dc3d8..5b5d487 100644
--- a/proton-c/bindings/cpp/src/thread_safe_test.cpp
+++ b/proton-c/bindings/cpp/src/thread_safe_test.cpp
@@ -24,7 +24,7 @@
 #include "proton_bits.hpp"
 
 #include "proton/thread_safe.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
 
 #include <proton/connection.h>
 
@@ -37,7 +37,7 @@
     pn_connection_t* c = 0;
     thread_safe<connection>* p = 0;
     {
-        io::connection_engine e;
+        io::connection_driver e;
         c = unwrap(e.connection());
         int r = pn_refcount(c);
         ASSERT(r >= 1); // engine may have internal refs (transport, collector).
@@ -54,7 +54,7 @@
     {
         std::shared_ptr<thread_safe<connection> > sp;
         {
-            io::connection_engine e;
+            io::connection_driver e;
             c = unwrap(e.connection());
             sp = make_shared_thread_safe(e.connection());
         }
@@ -63,7 +63,7 @@
     {
         std::unique_ptr<thread_safe<connection> > up;
         {
-            io::connection_engine e;
+            io::connection_driver e;
             c = unwrap(e.connection());
             up = make_unique_thread_safe(e.connection());
         }
@@ -78,7 +78,7 @@
     connection c;
     pn_connection_t* pc = 0;
     {
-        io::connection_engine eng;
+        io::connection_driver eng;
         c = eng.connection();
         pc = unwrap(c);         // Unwrap in separate scope to avoid confusion from temp values.
     }
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index ccd679d..9c6009f 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -5,35 +5,31 @@
 
 The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
 into proton [events](@ref event) and generates AMQP bytes from application
-calls.
+calls. There is no IO or threading code in this part of the library.
 
-The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
+## Proactive event-driven programming
+
+The [Proactor API](@ref proactor) is a pro-active, asynchronous framework to
+build single or multi-threaded Proton C applications. It manages the IO
+transport layer so you can write portable, event-driven AMQP code using the @ref
+engine API.
+
+## IO Integration
+
+The [connection driver](@ref connection_driver) provides a simple bytes in/bytes
 out, event-driven interface so you can read AMQP data from any source, process
-the resulting [events](@ref event) and write AMQP output to any destination.
+the resulting [events](@ref event) and write AMQP output to any destination. It
+lets you use proton in in alternate event loops, or for specialized embedded
+applications.
 
-There is no IO or threading code in this part of the library, so it can be
-embedded in many different environments. The proton project provides language
-bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
-threading facilities of the bound language.
-
-## Integrating with IO
-
-The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to
-build single or multi-threaded Proton C applications.
-
-For advanced use-cases it is possible to write your own implementation of the
-proactor API for an unusual IO or threading framework. Any proton application
+It is also possible to write your own implementation of the @ref proactor if you
+are dealing with an unusual IO or threading framework. Any proton application
 written to the proactor API will be able to use your implementation.
 
-## Messenger and Reactor APIs
+## Messenger and Reactor APIs (deprecated)
 
-The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
-to be simple APIs that included IO support directly out of the box.
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs are older APIs
+that were limited to single-threaded applications.
 
-They both had good points but were both based on the assumption of a single-threaded
-environment using a POSIX-like poll() call. This was a problem for performance on some
-platforms and did not support multi-threaded applications.
-
-Note however that application code which interacts with the AMQP @ref engine and
-processes AMQP @ref "events" event is the same for the proactor and reactor APIs,
-so is quite easy to convert. The main difference is in how connections are set up.
+Existing @ref reactor applications can be converted easily to use the @ref proactor,
+since they share the same @engine API and @ref event set.
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index 0ed23b0..70fad73 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -38,7 +38,7 @@
 /**
  * @file
  *
- * Connection API for the proton Engine.
+ * Connection API for the proton @ref engine
  *
  * @defgroup connection Connection
  * @ingroup engine
diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h
new file mode 100644
index 0000000..4fa3fb9
--- /dev/null
+++ b/proton-c/include/proton/connection_driver.h
@@ -0,0 +1,243 @@
+#ifndef PROTON_CONNECTION_DRIVER_H
+#define PROTON_CONNECTION_DRIVER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * @file
+ *
+ * @defgroup connection_driver Connection Driver
+ *
+ * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
+ * transports. Provides a single point of control for an AMQP connection and
+ * a simple bytes-in/bytes-out interface that lets you:
+ *
+ * - process AMQP-encoded bytes from some input byte stream
+ * - generate ::pn_event_t events for your application to handle
+ * - encode resulting AMQP output bytes for some output byte stream
+ *
+ * The pn_connection_driver_() functions provide a simplified API and extra
+ * logic to use ::pn_connection_t and ::pn_transport_t as a unit.  You can also
+ * access them directly for features that are not exposed via the @ref
+ * connection_driver API.
+ *
+ * The engine buffers events and data, you should run it until
+ * pn_connection_driver_finished() is true, to ensure all reading, writing and
+ * event handling (including ERROR and FINAL events) is finished.
+ *
+ * ## Error handling
+ *
+ * The pn_connection_driver_*() functions do not return an error code. IO errors set
+ * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
+ * code can set errors using pn_connection_driver_errorf()
+ *
+ * ## IO patterns
+ *
+ * This API supports asynchronous, proactive, non-blocking and reactive IO. An
+ * integration does not have to follow the dispatch-read-write sequence above,
+ * but note that you should handle all available events before calling
+ * pn_connection_driver_read_buffer() and check that `size` is non-zero before
+ * starting a blocking or asynchronous read call. A `read` started while there
+ * are unprocessed CLOSE events in the buffer may never complete.
+ *
+ * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
+ * an AMQP connection can close separately
+ *
+ * ## Thread safety
+ *
+ * The @ref engine types are not thread safe, but each connection and its
+ * associated types forms an independent unit. Different connections can be
+ * processed concurrently by different threads.
+ *
+ * @{
+ */
+
+#include <proton/import_export.h>
+#include <proton/event.h>
+#include <proton/types.h>
+
+#include <stdarg.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Struct containing the 3 elements needed to driver AMQP IO and events, aggregated as a unit.
+ */
+typedef struct pn_connection_driver_t {
+  pn_connection_t *connection;
+  pn_transport_t *transport;
+  pn_event_batch_t batch;
+} pn_connection_driver_t;
+
+/**
+ * Set #connection and #transport to the provided values, or create a new
+ * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
+ * The provided values belong to the connection driver and will be freed by
+ * pn_connection_driver_destroy()
+ *
+ * The transport is bound automatically after the PN_CONNECTION_INIT has been is
+ * handled by the application. It can be bound earlier with
+ * pn_connection_driver_bind().
+ *
+ * The following functions must be called before the transport is
+ * bound to have effect: pn_connection_set_username(), pn_connection_set_password(),
+ * pn_transport_set_server()
+ *
+ * @return PN_OUT_OF_MEMORY if any allocation fails.
+ */
+PN_EXTERN int pn_connection_driver_init(pn_connection_driver_t*, pn_connection_t*, pn_transport_t*);
+
+/** Force binding of the transport.
+ * This happens automatically after the PN_CONNECTION_INIT is processed.
+ *
+ * @return PN_STATE_ERR if the transport is already bound.
+ */
+PN_EXTERN int pn_connection_driver_bind(pn_connection_driver_t *d);
+
+/**
+ * Unbind, release and free #connection and #transport. Set all pointers to
+ * NULL.  Does not free the @ref pn_connection_driver_t struct itself.
+ */
+PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *);
+
+/**
+ * Get the read buffer.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_driver_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *);
+
+/**
+ * Process the first n bytes of data in pn_connection_driver_read_buffer() and
+ * reclaim the buffer space.
+ */
+PN_EXTERN void pn_connection_driver_read_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the read side. Call when the IO can no longer be read.
+ */
+PN_EXTERN void pn_connection_driver_read_close(pn_connection_driver_t *);
+
+/**
+ * True if read side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_read_closed(pn_connection_driver_t *);
+
+/**
+ * Get the write buffer.
+ *
+ * Write data from buf.start to your IO destination, up to a max of buf.size.
+ * Call pn_connection_driver_write_done() when writing is complete.
+ *
+ * buf.size==0 means there is nothing to write.
+ */
+ PN_EXTERN pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *);
+
+/**
+ * Call when the first n bytes of pn_connection_driver_write_buffer() have been
+ * written to IO. Reclaims the buffer space and reset the write buffer.
+ */
+PN_EXTERN void pn_connection_driver_write_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the write side. Call when IO can no longer be written to.
+ */
+PN_EXTERN void pn_connection_driver_write_close(pn_connection_driver_t *);
+
+/**
+ * True if write side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_write_closed(pn_connection_driver_t *);
+
+/**
+ * Close both sides side.
+ */
+PN_EXTERN void pn_connection_driver_close(pn_connection_driver_t * c);
+
+/**
+ * Get the next event to handle.
+ *
+ * @return pointer is valid till the next call of
+ * pn_connection_driver_next(). NULL if there are no more events available now,
+ * reading/writing may produce more.
+ */
+PN_EXTERN pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *);
+
+/**
+ * True if  pn_connection_driver_next_event() will return a non-NULL event.
+ */
+PN_EXTERN bool pn_connection_driver_has_event(pn_connection_driver_t *);
+
+/**
+ * Return true if the the engine is closed for reading and writing and there are
+ * no more events.
+ *
+ * Call pn_connection_driver_free() to free all related memory.
+ */
+PN_EXTERN bool pn_connection_driver_finished(pn_connection_driver_t *);
+
+/**
+ * Set IO error information.
+ *
+ * The name and formatted description are set on the transport condition, and
+ * returned as a PN_TRANSPORT_ERROR event from pn_connection_driver_next_event().
+ *
+ * You must call this *before* pn_connection_driver_read_close() or
+ * pn_connection_driver_write_close() to ensure the error is processed.
+ */
+PN_EXTERN void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...);
+
+/**
+ * Set IO error information via a va_list, see pn_connection_driver_errorf()
+ */
+PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list);
+
+/**
+ * Log a string message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, char *fmt, ...);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap);
+
+/**
+ * If batch is part of a connection_driver, return the connection_driver address,
+ * else return NULL
+ */
+PN_EXTERN pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch);
+///@}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_CONNECTION_DRIVER_H
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
deleted file mode 100644
index b7022a9..0000000
--- a/proton-c/include/proton/connection_engine.h
+++ /dev/null
@@ -1,313 +0,0 @@
-#ifndef PROTON_CONNECTION_ENGINE_H
-#define PROTON_CONNECTION_ENGINE_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * @file
- *
- * **Experimental** The connection IO API is a set of functions to simplify
- * integrating proton with different IO and concurrency platforms. The portable
- * parts of a Proton application should use the @ref engine types.  We will
- * use "application" to mean the portable part of the application and
- * "integration" to mean code that integrates with a particular IO platform.
- *
- * The connection_engine functions take a @ref pn_connection_t\*, and perform common
- * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and
- * @ref pn_collector_t so you can treat them as a unit. You can also work with
- * these types directly for features not available via @ref connection_engine API.
- *
- * @defgroup connection_engine Connection Engine
- *
- * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
- * transports. Provides a single point of control for an AMQP connection and
- * a simple bytes-in/bytes-out interface that lets you:
- *
- * - process AMQP-encoded bytes from some input byte stream
- * - generate @ref pn_event_t events for your application to handle
- * - encode resulting AMQP output bytes for some output byte stream
- *
- * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref
- * pn_collector_t and provides functions to operate on all three as a unit for
- * IO integration. You can also use them directly for anything not covered by
- * this API
- *
- * For example a simple blocking IO integration with the imaginary "my_io" library:
- *
- *     pn_connection_engine_t ce;
- *     pn_connection_engine_init(&ce);
- *     while (!pn_connection_engine_finished(&ce) {
- *         // Dispatch events to be handled by the application.
- *         pn_event_t *e;
- *         while ((e = pn_connection_engine_event(&ce))!= NULL) {
- *             my_app_handle(e); // Pass to the application handler
- *             switch (pn_event_type(e)) {
- *                 case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce);
- *                 // Only for full-duplex IO where read/write can shutdown separately.
- *                 case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break;
- *                 case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break;
- *                 default: break;
- *             };
- *             e = pn_connection_engine_pop_event(&ce);
- *         }
- *         // Read from my_io into the connection buffer
- *         pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce);
- *         if (readbuf.size) {
- *             size_t n = my_io_read(readbuf.start, readbuf.size, ...);
- *             if (n > 0) {
- *                 pn_connection_engine_read_done(&ce, n);
- *             } else if (n < 0) {
- *                 pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...);
- *                 pn_connection_engine_read_close(&ce);
- *             }
- *         }
- *         // Write from connection buffer to my_io
- *         pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce);
- *         if (writebuf.size) {
- *             size_t n = my_io_write_data(writebuf.start, writebuf.size, ...);
- *             if (n < 0) {
- *                 pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...);
- *                 pn_connection_engine_write_close(&ce);
- *             } else {
- *                 pn_connection_engine_write_done(&ce, n);
- *             }
- *         }
- *     }
- *     // If my_io doesn't have separate read/write shutdown, then we should close it now.
- *     my_io_close(...);
- *
- * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
- * an AMQP connection can close separately, the example shows how to handle this
- * for full-duplex IO or IO with a simple close.
- *
- * The engine buffers events, you must keep processing till
- * pn_connection_engine_finished() is true, to ensure all reading, writing and event
- * handling (including ERROR and FINAL events) is completely finished.
- *
- * ## Error handling
- *
- * The pn_connection_engine_*() functions do not return an error code. IO errors set
- * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
- * code can set errors using pn_connection_engine_errorf()
- *
- * ## Other IO patterns
- *
- * This API supports asynchronous, proactive, non-blocking and reactive IO. An
- * integration does not have to follow the dispatch-read-write sequence above,
- * but note that you should handle all available events before calling
- * pn_connection_engine_read_buffer() and check that `size` is non-zero before
- * starting a blocking or asynchronous read call. A `read` started while there
- * are unprocessed CLOSE events in the buffer may never complete.
- *
- * ## Thread safety
- *
- * The @ref engine types are not thread safe, but each connection and its
- * associated types forms an independent unit. Different connections can be
- * processed concurrently by different threads.
- *
- * @defgroup connection_engine Connection IO
- * @{
- */
-
-#include <proton/import_export.h>
-#include <proton/event.h>
-#include <proton/types.h>
-
-#include <stdarg.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * Struct containing a connection, transport and collector. See
- * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine()
- */
-typedef struct pn_connection_engine_t {
-  pn_connection_t *connection;
-  pn_transport_t *transport;
-  pn_collector_t *collector;
-} pn_connection_engine_t;
-
-/**
- * Set #connection and #transport to the provided values, or create a new
- * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
- * The provided values belong to the connection engine and will be freed by
- * pn_connection_engine_destroy()
- *
- * Create a new @ref pn_collector_t and set as #collector.
- *
- * The transport and connection are *not* bound at this point. You should
- * configure them as needed and let the application handle the
- * PN_CONNECTION_INIT from pn_connection_engine_event() before calling
- * pn_connection_engine_bind().
- *
- * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY
- */
-PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*);
-
-/**
- * Bind the connection to the transport when the external IO is ready.
- *
- * The following functions (if called at all) must be called *before* bind:
- * pn_connection_set_username(), pn_connection_set_password(),  pn_transport_set_server()
- *
- * If there is an external IO error during setup, set a transport error, close
- * the transport and then bind. The error events are reported to the application
- * via pn_connection_engine_event().
- *
- * @return an error code if the bind fails.
- */
-PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *);
-
-/**
- * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL.
- * Does not free the @ref pn_connection_engine_t struct itself.
- */
-PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *);
-
-/**
- * Get the read buffer.
- *
- * Copy data from your input byte source to buf.start, up to buf.size.
- * Call pn_connection_engine_read_done() when reading is complete.
- *
- * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
- */
-PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *);
-
-/**
- * Process the first n bytes of data in pn_connection_engine_read_buffer() and
- * reclaim the buffer space.
- */
-PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the read side. Call when the IO can no longer be read.
- */
-PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *);
-
-/**
- * True if read side is closed.
- */
-PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *);
-
-/**
- * Get the write buffer.
- *
- * Write data from buf.start to your IO destination, up to a max of buf.size.
- * Call pn_connection_engine_write_done() when writing is complete.
- *
- * buf.size==0 means there is nothing to write.
- */
- PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *);
-
-/**
- * Call when the first n bytes of pn_connection_engine_write_buffer() have been
- * written to IO. Reclaims the buffer space and reset the write buffer.
- */
-PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the write side. Call when IO can no longer be written to.
- */
-PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *);
-
-/**
- * True if write side is closed.
- */
-PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *);
-
-/**
- * Close both sides side.
- */
-PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c);
-
-/**
- * Get the current event. Call pn_connection_engine_done() when done handling it.
- * Note that if PN_TRACE_EVT is enabled this will log the event, so you should
- * avoid calling it more than once per event. Use pn_connection_engine_has_event()
- * to silently test if any events are available.
- *
- * @return NULL if there are no more events ready. Reading/writing data may produce more.
- */
-PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *);
-
-/**
- * True if  pn_connection_engine_event() will return a non-NULL event.
- */
-PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *);
-
-/**
- * Drop the current event, advance pn_connection_engine_event() to the next event.
- */
-PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *);
-
-/**
- * Return true if the the engine is closed for reading and writing and there are
- * no more events.
- *
- * Call pn_connection_engine_free() to free all related memory.
- */
-PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *);
-
-/**
- * Set IO error information.
- *
- * The name and formatted description are set on the transport condition, and
- * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event().
- *
- * You must call this *before* pn_connection_engine_read_close() or
- * pn_connection_engine_write_close() to ensure the error is processed.
- *
- * If there is already a transport condition set, this call does nothing.  For
- * more complex cases, you can work with the transport condition directly using:
- *
- *     pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn));
- */
-PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...);
-
-/**
- * Set IO error information via a va_list, see pn_connection_engine_errorf()
- */
-PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list);
-
-/**
- * Log a string message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap);
-
-///@}
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif // PROTON_CONNECTION_ENGINE_H
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index ffcf830..931437e 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -39,6 +39,9 @@
 %ignore pn_bytes_t;
 %ignore pn_rwbytes_t;
 
+/* pn_event_batch_t is not used directly by bindings */
+%ignore pn_event_batch_t;
+
 /* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */
 %ignore pn_class_t;
 
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4dca2d5..31d4bdd 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -428,6 +428,32 @@
 PN_EXTERN bool pn_collector_pop(pn_collector_t *collector);
 
 /**
+ * Return the next event to be handled.
+ *
+ * Returns the head event if it has not previously been returned by
+ * pn_collector_next(), otherwise does pn_collector_pop() and returns
+ * the new head event.
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return the next event.
+ */
+PN_EXTERN pn_event_t *pn_collector_next(pn_collector_t *collector);
+
+/**
+ * Return the same event as the previous call to pn_collector_next()
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return a pointer to the event returned by previous call to pn_collector_next()
+ */
+PN_EXTERN pn_event_t *pn_collector_prev(pn_collector_t *collector);
+
+/**
  * Check if there are more events after the current event. If this
  * returns true, then pn_collector_peek() will return an event even
  * after pn_collector_pop() is called.
@@ -506,6 +532,36 @@
  */
 PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
 
+/**
+ * **Experimental**: A batch of events to handle. Call pn_event_batch_next() in
+ * a loop until it returns NULL to handle them.
+ */
+typedef struct pn_event_batch_t pn_event_batch_t;
+
+/* NOTE: there is deliberately no peek(), more() or other look-ahead on an event
+ * batch. We want to know exactly which events have been handled, next() only
+ * allows the user to get each event exactly once, in order.
+ */
+
+/**
+ * **Experimental**: Remove the next event from the batch and return it. NULL
+ *  means the batch is empty. The returned event pointer is valid until
+ *  pn_event_batch_next() is called again on the same batch.
+ */
+PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
+
+/**
+ *@cond INTERNAL
+ * pn_event_batch_next() can be re-implemented for different behaviors in different contextxs.
+ */
+struct pn_event_batch_t {
+  pn_event_t *(*next_event)(pn_event_batch_t *batch);
+};
+
+/**
+ *@endcond
+ */
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 49d7b6a..e23a24f 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -33,30 +33,27 @@
 /**
  * @file
  *
- * **Experimental**: Proactor API for the the proton @ref engine.
+ * **Experimental**: Proactor API for the the proton @ref engine
  *
  * @defgroup proactor Proactor
  *
  * **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications.
  *
- * The proactor establishes and listens for connections. It creates the @ref
- * "transport" transport that sends and receives data over the network and
- * delivers @ref "events" event to application threads for processing.
+ * The proactor establishes and listens for connections. It creates
+ * the @ref transport that sends and receives data over the network and
+ * delivers @ref event to application threads for handling.
  *
- * ## Multi-threading
- *
- * The @ref proactor is thread-safe, but the @ref "protocol engine" is not.  The
- * proactor ensures that each @ref "connection" connection and its associated
- * values (@ref session, @ref link etc.) is processed sequentially, even if there
- * are multiple application threads. See pn_proactor_wait()
+ * **Multi-threading**:
+ * The @ref proactor is thread-safe, but the @ref engine is not.  The proactor
+ * ensures that each @ref connection and its associated values (@ref session,
+ * @ref link etc.) is handle sequentially, even if there are multiple
+ * application threads. See pn_proactor_wait()
  *
  * @{
  */
 
 /**
- * The proactor creates and manage @ref "transports" transport and delivers @ref
- * "event" events to the application.
- *
+ * The proactor.
  */
 typedef struct pn_proactor_t pn_proactor_t;
 
@@ -70,13 +67,6 @@
  */
 void pn_proactor_free(pn_proactor_t*);
 
-/* FIXME aconway 2016-11-12: connect and listen need options to enable
-   things like websockets, alternate encryption or other features.
-   The "extra" parameter will be replaced by an "options" parameter
-   that will include providing extra data and other manipulators
-   to affect how the connection is processed.
-*/
-
 /**
  * Asynchronous connect: a connection and transport will be created, the
  * relevant events will be returned by pn_proactor_wait()
@@ -104,13 +94,27 @@
 pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
 
 /**
- * Wait for an event. Can be called in multiple threads concurrently.
- * You must call pn_event_done() when the event has been handled.
+ * Wait for events to handle. Call pn_proactor_done() after handling events.
  *
- * The proactor ensures that events that cannot be handled concurrently
- * (e.g. events for for the same connection) are never returned concurrently.
+ * Thread safe: pn_proactor_wait() can be called concurrently, but the events in
+ * the returned ::pn_event_batch_t must be handled sequentially.
+ *
+ * The proactor always returns events that must be handled sequentially in the
+ * same batch or sequentially in a later batch after pn_proactor_done(). Any
+ * events returned concurrently by pn_proactor_wait() are safe to handle
+ * concurrently.
  */
-pn_event_t *pn_proactor_wait(pn_proactor_t* d);
+pn_event_batch_t *pn_proactor_wait(pn_proactor_t* d);
+
+/**
+ * Call when done handling events.
+ *
+ * It is generally most efficient to handle the entire batch in the thread
+ * that calls pn_proactor_wait(), then call pn_proactor_done(). If you call
+ * pn_proactor_done() earlier, the remaining events will be returned again by
+ * pn_proactor_wait(), possibly to another thread.
+ */
+void pn_proactor_done(pn_proactor_t* d, pn_event_batch_t *events);
 
 /**
  * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait()
@@ -146,14 +150,6 @@
 pn_proactor_t *pn_connection_proactor(pn_connection_t *c);
 
 /**
- * Call when a proactor event has been handled. Does nothing if not a proactor event.
- *
- * Thread safe: May be called from any thread but must be called exactly once
- * for each event returned by pn_proactor_wait()
- */
-void pn_event_done(pn_event_t *);
-
-/**
  * Get the proactor that created the event or NULL.
  */
 pn_proactor_t *pn_event_proactor(pn_event_t *);
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index f31ddb0..3393e64 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -20,144 +20,149 @@
 #include "engine-internal.h"
 #include <proton/condition.h>
 #include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/transport.h>
 #include <string.h>
 
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
-  ce->connection = c ? c : pn_connection();
-  ce->transport = t ? t : pn_transport();
-  ce->collector = pn_collector();
-  if (!ce->connection || !ce->transport || !ce->collector) {
-    pn_connection_engine_destroy(ce);
+struct driver_batch {
+  pn_event_batch_t batch;
+};
+
+static pn_event_t *batch_next(pn_event_batch_t *batch) {
+  pn_connection_driver_t *d =
+    (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch));
+  pn_collector_t *collector = pn_connection_collector(d->connection);
+  pn_event_t *handled = pn_collector_prev(collector);
+  if (handled && pn_event_type(handled) == PN_CONNECTION_INIT) {
+      pn_transport_bind(d->transport, d->connection); /* Init event handled, auto-bind */
+  }
+  pn_event_t *next = pn_collector_next(collector);
+  if (next && d->transport->trace & PN_TRACE_EVT) {
+    pn_string_clear(d->transport->scratch);
+    pn_inspect(next, d->transport->scratch);
+    pn_transport_log(d->transport, pn_string_get(d->transport->scratch));
+  }
+  return next;
+}
+
+int pn_connection_driver_init(pn_connection_driver_t* d, pn_connection_t *c, pn_transport_t *t) {
+  memset(d, 0, sizeof(*d));
+  d->batch.next_event = &batch_next;
+  d->connection = c ? c : pn_connection();
+  d->transport = t ? t : pn_transport();
+  pn_collector_t *collector = pn_collector();
+  if (!d->connection || !d->transport || !collector) {
+    if (collector) pn_collector_free(collector);
+    pn_connection_driver_destroy(d);
     return PN_OUT_OF_MEMORY;
   }
-  pn_connection_collect(ce->connection, ce->collector);
+  pn_connection_collect(d->connection, collector);
   return 0;
 }
 
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
-  return pn_transport_bind(ce->transport, ce->connection);
+int pn_connection_driver_bind(pn_connection_driver_t *d) {
+  return pn_transport_bind(d->transport, d->connection);
 }
 
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
-  if (ce->transport) {
-    pn_transport_unbind(ce->transport);
-    pn_transport_free(ce->transport);
+void pn_connection_driver_destroy(pn_connection_driver_t *d) {
+  if (d->transport) {
+    pn_transport_unbind(d->transport);
+    pn_transport_free(d->transport);
   }
-  if (ce->collector) pn_collector_free(ce->collector);
-  if (ce->connection) pn_connection_free(ce->connection);
-  memset(ce, 0, sizeof(*ce));
+  if (d->connection) {
+    pn_collector_t *collector = pn_connection_collector(d->connection);
+    pn_connection_free(d->connection);
+    pn_collector_free(collector);
+  }
+  memset(d, 0, sizeof(*d));
 }
 
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
-  ssize_t cap = pn_transport_capacity(ce->transport);
-  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
+pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) {
+  ssize_t cap = pn_transport_capacity(d->transport);
+  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
 }
 
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
-  if (n > 0) pn_transport_process(ce->transport, n);
+void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) {
+  if (n > 0) pn_transport_process(d->transport, n);
 }
 
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
-  return pn_transport_capacity(ce->transport) < 0;
+bool pn_connection_driver_read_closed(pn_connection_driver_t *d) {
+  return pn_transport_capacity(d->transport) < 0;
 }
 
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_read_closed(ce)) {
-    pn_transport_close_tail(ce->transport);
+void pn_connection_driver_read_close(pn_connection_driver_t *d) {
+  if (!pn_connection_driver_read_closed(d)) {
+    pn_transport_close_tail(d->transport);
   }
 }
 
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
-  ssize_t pending = pn_transport_pending(ce->transport);
+pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *d) {
+  ssize_t pending = pn_transport_pending(d->transport);
   return (pending > 0) ?
-    pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
+    pn_bytes(pending, pn_transport_head(d->transport)) : pn_bytes_null;
 }
 
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) {
   if (n > 0)
-    pn_transport_pop(ce->transport, n);
+    pn_transport_pop(d->transport, n);
 }
 
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
-  return pn_transport_pending(ce->transport) < 0;
+bool pn_connection_driver_write_closed(pn_connection_driver_t *d) {
+  return pn_transport_pending(d->transport) < 0;
 }
 
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_write_closed(ce)) {
-    pn_transport_close_head(ce->transport);
+void pn_connection_driver_write_close(pn_connection_driver_t *d) {
+  if (!pn_connection_driver_write_closed(d)) {
+    pn_transport_close_head(d->transport);
   }
 }
 
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
-  pn_connection_engine_read_close(ce);
-  pn_connection_engine_write_close(ce);
+void pn_connection_driver_close(pn_connection_driver_t *d) {
+  pn_connection_driver_read_close(d);
+  pn_connection_driver_write_close(d);
 }
 
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
-  pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
-  if (e) {
-    pn_transport_t *t = ce->transport;
-    if (t && t->trace & PN_TRACE_EVT) {
-      /* This can log the same event twice if pn_connection_engine_event is called
-       * twice but for debugging it is much better to log before handling than after.
-       */
-      pn_string_clear(t->scratch);
-      pn_inspect(e, t->scratch);
-      pn_transport_log(t, pn_string_get(t->scratch));
-    }
-  }
-  return e;
+pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
+  return pn_event_batch_next(&d->batch);
 }
 
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
-  return ce->collector && pn_collector_peek(ce->collector);
+bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
+  return pn_collector_peek(pn_connection_collector(d->connection));
 }
 
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
-  if (ce->collector) {
-    pn_event_t *e = pn_collector_peek(ce->collector);
-    if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
-      /* Events can accumulate behind the TRANSPORT_CLOSED before the
-       * PN_TRANSPORT_CLOSED event is handled. They can never be processed
-       * so release them.
-       */
-      pn_collector_release(ce->collector);
-    } else {
-      pn_collector_pop(ce->collector);
-    }
-
-  }
+bool pn_connection_driver_finished(pn_connection_driver_t *d) {
+  return pn_transport_closed(d->transport) && !pn_connection_driver_has_event(d);
 }
 
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
-  return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
-}
-
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
-  pn_transport_t *t = ce->transport;
+void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list ap) {
+  pn_transport_t *t = d->transport;
   pn_condition_t *cond = pn_transport_condition(t);
   pn_string_vformat(t->scratch, fmt, ap);
   pn_condition_set_name(cond, name);
   pn_condition_set_description(cond, pn_string_get(t->scratch));
 }
 
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...) {
   va_list ap;
   va_start(ap, fmt);
-  pn_connection_engine_verrorf(ce, name, fmt, ap);
+  pn_connection_driver_verrorf(d, name, fmt, ap);
   va_end(ap);
 }
 
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
+void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg) {
+  pn_transport_log(d->transport, msg);
 }
 
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
-  pn_transport_vlogf(ce->transport, fmt, ap);
+void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) {
+  pn_transport_vlogf(d->transport, fmt, ap);
 }
 
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
+void pn_connection_driver_vlog(pn_connection_driver_t *d, const char *msg) {
+  pn_transport_log(d->transport, msg);
+}
+
+pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch) {
+  return (batch->next_event == batch_next) ?
+    (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)) :
+    NULL;
 }
diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c
deleted file mode 100644
index f31ddb0..0000000
--- a/proton-c/src/core/connection_engine.c
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "engine-internal.h"
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/connection_engine.h>
-#include <proton/transport.h>
-#include <string.h>
-
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
-  ce->connection = c ? c : pn_connection();
-  ce->transport = t ? t : pn_transport();
-  ce->collector = pn_collector();
-  if (!ce->connection || !ce->transport || !ce->collector) {
-    pn_connection_engine_destroy(ce);
-    return PN_OUT_OF_MEMORY;
-  }
-  pn_connection_collect(ce->connection, ce->collector);
-  return 0;
-}
-
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
-  return pn_transport_bind(ce->transport, ce->connection);
-}
-
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
-  if (ce->transport) {
-    pn_transport_unbind(ce->transport);
-    pn_transport_free(ce->transport);
-  }
-  if (ce->collector) pn_collector_free(ce->collector);
-  if (ce->connection) pn_connection_free(ce->connection);
-  memset(ce, 0, sizeof(*ce));
-}
-
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
-  ssize_t cap = pn_transport_capacity(ce->transport);
-  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
-}
-
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
-  if (n > 0) pn_transport_process(ce->transport, n);
-}
-
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
-  return pn_transport_capacity(ce->transport) < 0;
-}
-
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_read_closed(ce)) {
-    pn_transport_close_tail(ce->transport);
-  }
-}
-
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
-  ssize_t pending = pn_transport_pending(ce->transport);
-  return (pending > 0) ?
-    pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
-}
-
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
-  if (n > 0)
-    pn_transport_pop(ce->transport, n);
-}
-
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
-  return pn_transport_pending(ce->transport) < 0;
-}
-
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_write_closed(ce)) {
-    pn_transport_close_head(ce->transport);
-  }
-}
-
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
-  pn_connection_engine_read_close(ce);
-  pn_connection_engine_write_close(ce);
-}
-
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
-  pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
-  if (e) {
-    pn_transport_t *t = ce->transport;
-    if (t && t->trace & PN_TRACE_EVT) {
-      /* This can log the same event twice if pn_connection_engine_event is called
-       * twice but for debugging it is much better to log before handling than after.
-       */
-      pn_string_clear(t->scratch);
-      pn_inspect(e, t->scratch);
-      pn_transport_log(t, pn_string_get(t->scratch));
-    }
-  }
-  return e;
-}
-
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
-  return ce->collector && pn_collector_peek(ce->collector);
-}
-
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
-  if (ce->collector) {
-    pn_event_t *e = pn_collector_peek(ce->collector);
-    if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
-      /* Events can accumulate behind the TRANSPORT_CLOSED before the
-       * PN_TRANSPORT_CLOSED event is handled. They can never be processed
-       * so release them.
-       */
-      pn_collector_release(ce->collector);
-    } else {
-      pn_collector_pop(ce->collector);
-    }
-
-  }
-}
-
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
-  return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
-}
-
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
-  pn_transport_t *t = ce->transport;
-  pn_condition_t *cond = pn_transport_condition(t);
-  pn_string_vformat(t->scratch, fmt, ap);
-  pn_condition_set_name(cond, name);
-  pn_condition_set_description(cond, pn_string_get(t->scratch));
-}
-
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
-  va_list ap;
-  va_start(ap, fmt);
-  pn_connection_engine_verrorf(ce, name, fmt, ap);
-  va_end(ap);
-}
-
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
-}
-
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
-  pn_transport_vlogf(ce->transport, fmt, ap);
-}
-
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
-}
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 7882327..2a0a5cf 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -28,7 +28,8 @@
   pn_list_t *pool;
   pn_event_t *head;
   pn_event_t *tail;
-  bool freed;
+  bool freed:1;
+  bool head_returned:1;         /* Head has been returned by pn_collector_next() */
 };
 
 struct pn_event_t {
@@ -51,11 +52,8 @@
 static void pn_collector_drain(pn_collector_t *collector)
 {
   assert(collector);
-
-  while (pn_collector_peek(collector)) {
-    pn_collector_pop(collector);
-  }
-
+  while (pn_collector_next(collector))
+    ;
   assert(!collector->head);
   assert(!collector->tail);
 }
@@ -175,6 +173,7 @@
 
 bool pn_collector_pop(pn_collector_t *collector)
 {
+  collector->head_returned = false;
   pn_event_t *event = collector->head;
   if (event) {
     collector->head = event->next;
@@ -190,6 +189,19 @@
   return true;
 }
 
+pn_event_t *pn_collector_next(pn_collector_t *collector)
+{
+  if (collector->head_returned) {
+    pn_collector_pop(collector);
+  }
+  collector->head_returned = collector->head;
+  return collector->head;
+}
+
+pn_event_t *pn_collector_prev(pn_collector_t *collector) {
+  return collector->head_returned ? collector->head : NULL;
+}
+
 bool pn_collector_more(pn_collector_t *collector)
 {
   assert(collector);
@@ -386,3 +398,7 @@
   }
   return NULL;
 }
+
+pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) {
+  return batch->next_event(batch);
+}
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index a36d01c..267c861 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -313,7 +313,8 @@
 }
 
 static void drain(pn_collector_t *collector) {
-  while (pn_collector_peek(collector)) { pn_collector_pop(collector); }
+  while (pn_collector_next(collector))
+    ;
 }
 
 static void test_collector_connection_transport(void) {
diff --git a/qpid-proton-cpp.syms b/qpid-proton-cpp.syms
index 7a25651..c24e898 100644
--- a/qpid-proton-cpp.syms
+++ b/qpid-proton-cpp.syms
@@ -30,24 +30,24 @@
 proton::connection::user(std::string const&)
 proton::connection::~connection()
 
-proton::connection_engine::can_read() const
-proton::connection_engine::can_write() const
-proton::connection_engine::closed() const
-proton::connection_engine::connection() const
-proton::connection_engine::connection_engine(proton::handler&, proton::connection_options const&)
-proton::connection_engine::container::container(std::string const&)
-proton::connection_engine::container::id() const
-proton::connection_engine::container::make_options()
-proton::connection_engine::container::options(proton::connection_options const&)
-proton::connection_engine::container::~container()
-proton::connection_engine::dispatch()
-proton::connection_engine::io_error::io_error(std::string const&)
-proton::connection_engine::io_error::~io_error()
-proton::connection_engine::no_opts
-proton::connection_engine::process(int)
-proton::connection_engine::try_read()
-proton::connection_engine::try_write()
-proton::connection_engine::~connection_engine()
+proton::connection_driver::can_read() const
+proton::connection_driver::can_write() const
+proton::connection_driver::closed() const
+proton::connection_driver::connection() const
+proton::connection_driver::connection_driver(proton::handler&, proton::connection_options const&)
+proton::connection_driver::container::container(std::string const&)
+proton::connection_driver::container::id() const
+proton::connection_driver::container::make_options()
+proton::connection_driver::container::options(proton::connection_options const&)
+proton::connection_driver::container::~container()
+proton::connection_driver::dispatch()
+proton::connection_driver::io_error::io_error(std::string const&)
+proton::connection_driver::io_error::~io_error()
+proton::connection_driver::no_opts
+proton::connection_driver::process(int)
+proton::connection_driver::try_read()
+proton::connection_driver::try_write()
+proton::connection_driver::~connection_driver()
 
 proton::connection_options::connection_options()
 proton::connection_options::connection_options(proton::connection_options const&)
@@ -587,8 +587,8 @@
 # Only types with the following info can be thrown across shared abject boundary
 # Or correctly dynamically cast by user
 typeinfo for proton::connection
-typeinfo for proton::connection_engine
-typeinfo for proton::connection_engine::io_error
+typeinfo for proton::connection_driver
+typeinfo for proton::connection_driver::io_error
 typeinfo for proton::conversion_error
 typeinfo for proton::endpoint
 typeinfo for proton::error
@@ -600,8 +600,8 @@
 typeinfo for proton::timeout_error
 typeinfo for proton::url_error
 typeinfo name for proton::connection
-typeinfo name for proton::connection_engine
-typeinfo name for proton::connection_engine::io_error
+typeinfo name for proton::connection_driver
+typeinfo name for proton::connection_driver::io_error
 typeinfo name for proton::conversion_error
 typeinfo name for proton::endpoint
 typeinfo name for proton::error
@@ -613,8 +613,8 @@
 typeinfo name for proton::timeout_error
 typeinfo name for proton::url_error
 vtable for proton::connection
-vtable for proton::connection_engine
-vtable for proton::connection_engine::io_error
+vtable for proton::connection_driver
+vtable for proton::connection_driver::io_error
 vtable for proton::conversion_error
 vtable for proton::endpoint
 vtable for proton::error