PROTON-2: merge latest trunk into branch

git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/driver_abstraction@1387437 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/proton-c/bindings/python/CMakeLists.txt b/proton-c/bindings/python/CMakeLists.txt
index ae6c645..fcdcd02 100644
--- a/proton-c/bindings/python/CMakeLists.txt
+++ b/proton-c/bindings/python/CMakeLists.txt
@@ -15,9 +15,16 @@
                               WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
 install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile cproton.py
                               WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -m py_compile proton.py
+                              WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile proton.py
+                              WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
 install(FILES ${CMAKE_CURRENT_BINARY_DIR}/cproton.py
               ${CMAKE_CURRENT_BINARY_DIR}/cproton.pyc
               ${CMAKE_CURRENT_BINARY_DIR}/cproton.pyo
+              ${CMAKE_CURRENT_SOURCE_DIR}/proton.py
+              ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyc
+              ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyo
         DESTINATION ${PYTHON_SITEARCH_PACKAGES}
         COMPONENT Python)
 install(FILES ${CMAKE_CURRENT_BINARY_DIR}/_cproton.so
diff --git a/proton-c/bindings/python/proton.py b/proton-c/bindings/python/proton.py
new file mode 100644
index 0000000..e40040c
--- /dev/null
+++ b/proton-c/bindings/python/proton.py
@@ -0,0 +1,272 @@
+from xproton import *
+
+class ProtonException(Exception):
+  pass
+
+class Timeout(ProtonException):
+  pass
+
+class MessengerException(ProtonException):
+  pass
+
+class MessageException(ProtonException):
+  pass
+
+EXCEPTIONS = {
+  PN_TIMEOUT: Timeout
+  }
+
+class Messenger(object):
+
+  def __init__(self, name=None):
+    self._mng = pn_messenger(name);
+
+  def __del__(self):
+    if hasattr(self, "_mng"):
+      pn_messenger_free(self._mng)
+      del self._mng
+
+  def _check(self, err):
+    if err < 0:
+      exc = EXCEPTIONS.get(err, MessengerException)
+      raise exc("[%s]: %s" % (err, pn_messenger_error(self._mng)))
+    else:
+      return err
+
+  @property
+  def name(self):
+    return pn_messenger_name(self._mng)
+
+  @property
+  def timeout(self):
+    return pn_messenger_get_timeout(self._mng)
+
+  @timeout.setter
+  def timeout(self, value):
+    self._check(pn_messenger_set_timeout(self._mng, value))
+
+  def start(self):
+    self._check(pn_messenger_start(self._mng))
+
+  def stop(self):
+    self._check(pn_messenger_stop(self._mng))
+
+  def subscribe(self, source):
+    self._check(pn_messenger_subscribe(self._mng, source))
+
+  def put(self, msg):
+    self._check(pn_messenger_put(self._mng, msg._msg))
+
+  def send(self):
+    self._check(pn_messenger_send(self._mng))
+
+  def recv(self, n):
+    self._check(pn_messenger_recv(self._mng, n))
+
+  def get(self, msg):
+    self._check(pn_messenger_get(self._mng, msg._msg))
+
+  @property
+  def outgoing(self):
+    return pn_messenger_outgoing(self._mng)
+
+  @property
+  def incoming(self):
+    return pn_messenger_incoming(self._mng)
+
+class Message(object):
+
+  DATA = PN_DATA
+  TEXT = PN_TEXT
+  AMQP = PN_AMQP
+  JSON = PN_JSON
+
+  def __init__(self):
+    self._msg = pn_message()
+
+  def __del__(self):
+    if hasattr(self, "_msg"):
+      pn_message_free(self._msg)
+      del self._msg
+
+  def _check(self, err):
+    if err < 0:
+      exc = EXCEPTIONS.get(err, MessageException)
+      raise exc("[%s]: %s" % (err, pn_message_error(self._msg)))
+    else:
+      return err
+
+  def clear(self):
+    pn_message_clear(self._msg)
+
+  @property
+  def durable(self):
+    return pn_message_is_durable(self._msg)
+
+  @durable.setter
+  def durable(self, value):
+    self._check(pn_message_set_durable(self._msg, bool(value)))
+
+  @property
+  def priority(self):
+    return pn_message_get_priority(self._msg)
+
+  @priority.setter
+  def priority(self, value):
+    self._check(pn_message_set_priority(self._msg, value))
+
+  @property
+  def ttl(self):
+    return pn_message_get_ttl(self._msg)
+
+  @ttl.setter
+  def ttl(self, value):
+    self._check(pn_message_set_ttl(self._msg, value))
+
+  @property
+  def first_acquirer(self):
+    return pn_message_is_first_acquirer(self._msg)
+
+  @first_acquirer.setter
+  def first_acquirer(self, value):
+    self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
+
+  @property
+  def delivery_count(self):
+    return pn_message_get_delivery_count(self._msg)
+
+  @delivery_count.setter
+  def delivery_count(self, value):
+    self._check(pn_message_set_delivery_count(self._msg, value))
+
+  # XXX
+  @property
+  def id(self):
+    return pn_message_get_id(self._msg)
+
+  @id.setter
+  def id(self, value):
+    self._check(pn_message_set_id(self._msg, value))
+
+  @property
+  def user_id(self):
+    return pn_message_get_user_id(self._msg)
+
+  @user_id.setter
+  def user_id(self, value):
+    self._check(pn_message_set_user_id(self._msg, value))
+
+  @property
+  def address(self):
+    return pn_message_get_address(self._msg)
+
+  @address.setter
+  def address(self, value):
+    self._check(pn_message_set_address(self._msg, value))
+
+  @property
+  def subject(self):
+    return pn_message_get_subject(self._msg)
+
+  @subject.setter
+  def subject(self, value):
+    self._check(pn_message_set_subject(self._msg, value))
+
+  @property
+  def reply_to(self):
+    return pn_message_get_reply_to(self._msg)
+
+  @reply_to.setter
+  def reply_to(self, value):
+    self._check(pn_message_set_reply_to(self._msg, value))
+
+  # XXX
+  @property
+  def correlation_id(self):
+    return pn_message_get_correlation_id(self._msg)
+
+  @correlation_id.setter
+  def correlation_id(self, value):
+    self._check(pn_message_set_correlation_id(self._msg, value))
+
+  @property
+  def content_type(self):
+    return pn_message_get_content_type(self._msg)
+
+  @content_type.setter
+  def content_type(self, value):
+    self._check(pn_message_set_content_type(self._msg, value))
+
+  @property
+  def content_encoding(self):
+    return pn_message_get_content_encoding(self._msg)
+
+  @content_encoding.setter
+  def content_encoding(self, value):
+    self._check(pn_message_set_content_encoding(self._msg, value))
+
+  @property
+  def expiry_time(self):
+    return pn_message_get_expiry_time(self._msg)
+
+  @expiry_time.setter
+  def expiry_time(self, value):
+    self._check(pn_message_set_expiry_time(self._msg, value))
+
+  @property
+  def creation_time(self):
+    return pn_message_get_creation_time(self._msg)
+
+  @creation_time.setter
+  def creation_time(self, value):
+    self._check(pn_message_set_creation_time(self._msg, value))
+
+  @property
+  def group_id(self):
+    return pn_message_get_group_id(self._msg)
+
+  @group_id.setter
+  def group_id(self, value):
+    self._check(pn_message_set_group_id(self._msg, value))
+
+  @property
+  def group_sequence(self):
+    return pn_message_get_group_sequence(self._msg)
+
+  @group_sequence.setter
+  def group_sequence(self, value):
+    self._check(pn_message_set_group_sequence(self._msg, value))
+
+  @property
+  def reply_to_group_id(self):
+    return pn_message_get_reply_to_group_id(self._msg)
+
+  @reply_to_group_id.setter
+  def reply_to_group_id(self, value):
+    self._check(pn_message_set_reply_to_group_id(self._msg, value))
+
+  # XXX
+  @property
+  def format(self):
+    return pn_message_get_format(self._msg)
+
+  @format.setter
+  def format(self, value):
+    self._check(pn_message_set_format(self._msg, value))
+
+  def load(self, data):
+    self._check(pn_message_load(self._msg, data))
+
+  def save(self):
+    sz = 16
+    while True:
+      err, data = pn_message_save(self._msg, sz)
+      if err == PN_OVERFLOW:
+        sz *= 2
+        continue
+      else:
+        self._check(err)
+        return data
+
+__all__ = ["Messenger", "Message", "ProtonException", "MessengerException",
+           "MessageException", "Timeout"]
diff --git a/proton-c/include/proton/engine.h b/proton-c/include/proton/engine.h
index cd80cdf..40f1c73 100644
--- a/proton-c/include/proton/engine.h
+++ b/proton-c/include/proton/engine.h
@@ -264,6 +264,8 @@
 void pn_session_open(pn_session_t *session);
 void pn_session_close(pn_session_t *session);
 void pn_session_free(pn_session_t *session);
+void *pn_session_context(pn_session_t *session);
+void pn_session_set_context(pn_session_t *session, void *context);
 
 // link
 const char *pn_link_name(pn_link_t *link);
@@ -291,6 +293,8 @@
 void pn_link_open(pn_link_t *sender);
 void pn_link_close(pn_link_t *sender);
 void pn_link_free(pn_link_t *sender);
+void *pn_link_context(pn_link_t *link);
+void pn_link_set_context(pn_link_t *link, void *context);
 
 // sender
 //void pn_offer(pn_sender_t *sender, int credits);
@@ -319,6 +323,8 @@
 //int pn_format(pn_delivery_t *delivery);
 void pn_settle(pn_delivery_t *delivery);
 void pn_delivery_dump(pn_delivery_t *delivery);
+void *pn_delivery_context(pn_delivery_t *delivery);
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context);
 
 #ifdef __cplusplus
 }
diff --git a/proton-c/src/driver-internal.h b/proton-c/src/driver-internal.h
index d4d794c..a96f06f 100644
--- a/proton-c/src/driver-internal.h
+++ b/proton-c/src/driver-internal.h
@@ -103,7 +103,10 @@
 
 int pn_connector_poller_init( struct pn_connector_t *);
 void pn_connector_poller_destroy( struct pn_connector_t *);
-void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+//void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+void pn_driver_poller_wait_1(pn_driver_t *);
+void pn_driver_poller_wait_2(pn_driver_t *, int);
+void pn_driver_poller_wait_3(pn_driver_t *);
 int pn_io_handler(pn_connector_t *);
 int pn_null_io_handler(pn_connector_t *);
 void pn_connector_process_output(pn_connector_t *);
diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c
index 7ec0258..91e24af 100644
--- a/proton-c/src/driver.c
+++ b/proton-c/src/driver.c
@@ -587,11 +587,36 @@
 }
 
 
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+//       temporary workaround for a multi-threading problem.  A multi-threaded
+//       application must hold a lock on parts 1 and 3, but not on part 2.
+//       This temporary change, which is not reflected in the driver's API, allows
+//       a multi-threaded application to use the three parts separately.
+//
+//       This workaround will eventually be replaced by a more elegant solution
+//       to the problem.
+//
+void pn_driver_wait_1(pn_driver_t *d)
+{
+  pn_driver_poller_wait_1(d);
+}
+
+void pn_driver_wait_2(pn_driver_t *d, int timeout)
+{
+  pn_driver_poller_wait_2(d, timeout);
+}
+
+void pn_driver_wait_3(pn_driver_t *d)
+{
+  pn_driver_poller_wait_3(d);
+}
+
 void pn_driver_wait(pn_driver_t *d, int timeout)
 {
-  pn_driver_poller_wait(d, timeout);
-  d->listener_next = d->listener_head;
-  d->connector_next = d->connector_head;
+  pn_driver_wait_1(d);
+  pn_driver_wait_2(d, timeout);
+  pn_driver_wait_3(d);
 }
 
 pn_listener_t *pn_driver_listener(pn_driver_t *d) {
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 35fbf34..89ceaea 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -138,6 +138,7 @@
   size_t link_capacity;
   size_t link_count;
   size_t id;
+  void *context;
 };
 
 struct pn_link_t {
@@ -159,6 +160,7 @@
   bool drain;
   bool drained; // sender only
   size_t id;
+  void *context;
 };
 
 struct pn_delivery_t {
@@ -182,6 +184,7 @@
   bool tpwork;
   pn_buffer_t *bytes;
   bool done;
+  void *transport_context;
   void *context;
 };
 
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index 22146ff..85e7941 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -296,6 +296,18 @@
   free(session);
 }
 
+void *pn_session_context(pn_session_t *session)
+{
+    return session ? session->context : 0;
+}
+
+void pn_session_set_context(pn_session_t *session, void *context)
+{
+    if (session)
+        session->context = context;
+}
+
+
 void pn_add_link(pn_session_t *ssn, pn_link_t *link)
 {
   PN_ENSURE(ssn->links, ssn->link_capacity, ssn->link_count + 1);
@@ -361,6 +373,17 @@
   free(link);
 }
 
+void *pn_link_context(pn_link_t *link)
+{
+    return link ? link->context : 0;
+}
+
+void pn_link_set_context(pn_link_t *link, void *context)
+{
+    if (link)
+        link->context = context;
+}
+
 void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
 {
   endpoint->type = type;
@@ -641,6 +664,7 @@
   ssn->links = NULL;
   ssn->link_capacity = 0;
   ssn->link_count = 0;
+  ssn->context = 0;
 
   return ssn;
 }
@@ -793,6 +817,7 @@
   link->queued = 0;
   link->drain = false;
   link->drained = false;
+  link->context = 0;
 }
 
 const char *pn_source(pn_link_t *link)
@@ -933,6 +958,7 @@
   delivery->tpwork = false;
   pn_buffer_clear(delivery->bytes);
   delivery->done = false;
+  delivery->transport_context = NULL;
   delivery->context = NULL;
 
   if (!link->current)
@@ -979,6 +1005,17 @@
          pn_readable(d), d->work);
 }
 
+void *pn_delivery_context(pn_delivery_t *delivery)
+{
+    return delivery ? delivery->context : NULL;
+}
+
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
+{
+    if (delivery)
+        delivery->context = context;
+}
+
 pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
   if (delivery) {
@@ -1052,8 +1089,8 @@
 
 void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
 {
-  pn_delivery_state_t *state = delivery->context;
-  delivery->context = NULL;
+  pn_delivery_state_t *state = delivery->transport_context;
+  delivery->transport_context = NULL;
   if (state) state->delivery = NULL;
   pn_real_settle(delivery);
   if (state) pn_delivery_buffer_gc(db);
@@ -1247,7 +1284,7 @@
 
     delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
     pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
-    delivery->context = state;
+    delivery->transport_context = state;
     if (id != state->id) {
       int err = pn_do_error(transport, "amqp:session:invalid-field",
                             "sequencing error, expected delivery-id %u, got %u",
@@ -1540,7 +1577,7 @@
 {
   if (delivery->settled) return false;
   if (pn_is_sender(delivery->link)) {
-    pn_delivery_state_t *state = delivery->context;
+    pn_delivery_state_t *state = delivery->transport_context;
     if (state) {
       return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
     } else {
@@ -1656,7 +1693,7 @@
   pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
   // XXX: check for null state
-  pn_delivery_state_t *state = delivery->context;
+  pn_delivery_state_t *state = delivery->transport_context;
   uint64_t code;
   switch(delivery->local_state) {
   case PN_ACCEPTED:
@@ -1684,10 +1721,10 @@
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
   pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
   if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
-    pn_delivery_state_t *state = delivery->context;
+    pn_delivery_state_t *state = delivery->transport_context;
     if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
       state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
-      delivery->context = state;
+      delivery->transport_context = state;
     }
 
     if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
@@ -1712,7 +1749,7 @@
     }
   }
 
-  pn_delivery_state_t *state = delivery->context;
+  pn_delivery_state_t *state = delivery->transport_context;
   // XXX: need to prevent duplicate disposition sending
   if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
       && state && state->sent) {
@@ -1732,7 +1769,7 @@
   pn_link_t *link = delivery->link;
   // XXX: need to prevent duplicate disposition sending
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
-  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->context) {
+  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->transport_context) {
     int err = pn_post_disp(transport, delivery);
     if (err) return err;
   }
diff --git a/proton-c/src/message/message.c b/proton-c/src/message/message.c
index df9e4f1..ee52fd2 100644
--- a/proton-c/src/message/message.c
+++ b/proton-c/src/message/message.c
@@ -445,7 +445,7 @@
       {
         pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
           group_id, reply_to_group_id;
-        err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSiS]", &user_id, &address,
+        err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSIS]", &user_id, &address,
                            &subject, &reply_to, &ctype, &cencoding,
                            &msg->expiry_time, &msg->creation_time, &group_id,
                            &msg->group_sequence, &reply_to_group_id);
diff --git a/proton-c/src/pollers/poll.c b/proton-c/src/pollers/poll.c
index bb9815a..e9b2b3e 100644
--- a/proton-c/src/pollers/poll.c
+++ b/proton-c/src/pollers/poll.c
@@ -96,7 +96,7 @@
     c->poller = NULL;
 }
 
-
+#if 0   // save for now
 void pn_driver_poller_wait(pn_driver_t *d, int timeout)
 {
   pn_driver_poller_t *poller = d->poller;
@@ -166,3 +166,102 @@
     c = c->connector_next;
   }
 }
+#endif
+
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+//       temporary workaround for a multi-threading problem.  A multi-threaded
+//       application must hold a lock on parts 1 and 3, but not on part 2.
+//       This temporary change, which is not reflected in the driver's API, allows
+//       a multi-threaded application to use the three parts separately.
+//
+//       This workaround will eventually be replaced by a more elegant solution
+//       to the problem.
+//
+
+static void pn_driver_poller_rebuild(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+  size_t size = d->listener_count + d->connector_count;
+  while (poller->capacity < size + 1) {
+    poller->capacity = poller->capacity ? 2*poller->capacity : 16;
+    poller->fds = realloc(poller->fds, poller->capacity*sizeof(struct pollfd));
+  }
+
+  poller->nfds = 0;
+
+  poller->fds[poller->nfds].fd = d->ctrl[0];
+  poller->fds[poller->nfds].events = POLLIN;
+  poller->fds[poller->nfds].revents = 0;
+  poller->nfds++;
+
+  pn_listener_t *l = d->listener_head;
+  for (int i = 0; i < d->listener_count; i++) {
+    poller->fds[poller->nfds].fd = l->fd;
+    poller->fds[poller->nfds].events = POLLIN;
+    poller->fds[poller->nfds].revents = 0;
+    l->poller->idx = poller->nfds;
+    poller->nfds++;
+    l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  for (int i = 0; i < d->connector_count; i++)
+  {
+    if (!c->closed) {
+      poller->fds[poller->nfds].fd = c->fd;
+      poller->fds[poller->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
+        (c->status & PN_SEL_WR ? POLLOUT : 0);
+      poller->fds[poller->nfds].revents = 0;
+      c->poller->idx = poller->nfds;
+      poller->nfds++;
+    }
+    c = c->connector_next;
+  }
+}
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+  pn_driver_poller_rebuild(d);
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+  pn_driver_poller_t *poller = d->poller;
+  DIE_IFE(poll(poller->fds, poller->nfds, d->closed_count > 0 ? 0 : timeout));
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  if (poller->fds[0].revents & POLLIN) {
+    //clear the pipe
+    char buffer[512];
+    while (read(d->ctrl[0], buffer, 512) == 512);
+  }
+
+  pn_listener_t *l = d->listener_head;
+  while (l) {
+    int idx = l->poller->idx;
+    l->pending = (idx && poller->fds[idx].revents & POLLIN);
+    l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  while (c) {
+    if (c->closed) {
+      c->pending_read = false;
+      c->pending_write = false;
+      c->pending_tick = false;
+    } else {
+      int idx = c->poller->idx;
+      c->pending_read = (idx && poller->fds[idx].revents & POLLIN);
+      c->pending_write = (idx && poller->fds[idx].revents & POLLOUT);
+    }
+    c = c->connector_next;
+  }
+
+  d->listener_next = d->listener_head;
+  d->connector_next = d->connector_head;
+}
diff --git a/proton-c/src/pollers/select.c b/proton-c/src/pollers/select.c
index ef9d0d1..dd06a89 100644
--- a/proton-c/src/pollers/select.c
+++ b/proton-c/src/pollers/select.c
@@ -78,7 +78,7 @@
 {
 }
 
-
+#if 0 // save it for now
 void pn_driver_poller_wait(pn_driver_t *d, int timeout)
 {
   pn_driver_poller_t *poller = d->poller;
@@ -147,3 +147,83 @@
       }
   }
 }
+#endif
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  // setup the select
+  FD_ZERO(&poller->readfds);
+  FD_ZERO(&poller->writefds);
+
+  FD_SET(d->ctrl[0], &poller->readfds);
+  poller->max_fds = d->ctrl[0];
+
+  pn_listener_t *l = d->listener_head;
+  for (int i = 0; i < d->listener_count; i++) {
+      FD_SET(l->fd, &poller->readfds);
+      if (l->fd > poller->max_fds) poller->max_fds = l->fd;
+      l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  for (int i = 0; i < d->connector_count; i++) {
+      if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
+          if (c->status & PN_SEL_RD)
+              FD_SET(c->fd, &poller->readfds);
+          if (c->status & PN_SEL_WR)
+              FD_SET(c->fd, &poller->writefds);
+          if (c->fd > poller->max_fds) poller->max_fds = c->fd;
+      }
+      c = c->connector_next;
+  }
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  struct timeval to = {0};
+  if (timeout > 0) {
+      // convert millisecs to sec and usec:
+      to.tv_sec = timeout/1000;
+      to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
+  }
+
+  int nfds = select(poller->max_fds + 1, &poller->readfds, &poller->writefds, NULL, timeout < 0 ? NULL : &to);
+  DIE_IFE(nfds);
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  if (FD_ISSET(d->ctrl[0], &poller->readfds)) {
+    //clear the pipe
+    char buffer[512];
+    while (read(d->ctrl[0], buffer, 512) == 512);
+  }
+
+  pn_listener_t *l = d->listener_head;
+  while (l) {
+    l->pending = FD_ISSET(l->fd, &poller->readfds);
+    l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  while (c) {
+    if (c->closed) {
+      c->pending_read = false;
+      c->pending_write = false;
+      c->pending_tick = false;
+    } else {
+      c->pending_read = FD_ISSET(c->fd, &poller->readfds);
+      c->pending_write = FD_ISSET(c->fd, &poller->writefds);
+    }
+    c = c->connector_next;
+  }
+
+  d->listener_next = d->listener_head;
+  d->connector_next = d->connector_head;
+}