PROTON-2: merge latest trunk into branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/driver_abstraction@1387437 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/proton-c/bindings/python/CMakeLists.txt b/proton-c/bindings/python/CMakeLists.txt
index ae6c645..fcdcd02 100644
--- a/proton-c/bindings/python/CMakeLists.txt
+++ b/proton-c/bindings/python/CMakeLists.txt
@@ -15,9 +15,16 @@
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile cproton.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -m py_compile proton.py
+ WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile proton.py
+ WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/cproton.py
${CMAKE_CURRENT_BINARY_DIR}/cproton.pyc
${CMAKE_CURRENT_BINARY_DIR}/cproton.pyo
+ ${CMAKE_CURRENT_SOURCE_DIR}/proton.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyc
+ ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyo
DESTINATION ${PYTHON_SITEARCH_PACKAGES}
COMPONENT Python)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/_cproton.so
diff --git a/proton-c/bindings/python/proton.py b/proton-c/bindings/python/proton.py
new file mode 100644
index 0000000..e40040c
--- /dev/null
+++ b/proton-c/bindings/python/proton.py
@@ -0,0 +1,272 @@
+from xproton import *
+
+class ProtonException(Exception):
+ pass
+
+class Timeout(ProtonException):
+ pass
+
+class MessengerException(ProtonException):
+ pass
+
+class MessageException(ProtonException):
+ pass
+
+EXCEPTIONS = {
+ PN_TIMEOUT: Timeout
+ }
+
+class Messenger(object):
+
+ def __init__(self, name=None):
+ self._mng = pn_messenger(name);
+
+ def __del__(self):
+ if hasattr(self, "_mng"):
+ pn_messenger_free(self._mng)
+ del self._mng
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, MessengerException)
+ raise exc("[%s]: %s" % (err, pn_messenger_error(self._mng)))
+ else:
+ return err
+
+ @property
+ def name(self):
+ return pn_messenger_name(self._mng)
+
+ @property
+ def timeout(self):
+ return pn_messenger_get_timeout(self._mng)
+
+ @timeout.setter
+ def timeout(self, value):
+ self._check(pn_messenger_set_timeout(self._mng, value))
+
+ def start(self):
+ self._check(pn_messenger_start(self._mng))
+
+ def stop(self):
+ self._check(pn_messenger_stop(self._mng))
+
+ def subscribe(self, source):
+ self._check(pn_messenger_subscribe(self._mng, source))
+
+ def put(self, msg):
+ self._check(pn_messenger_put(self._mng, msg._msg))
+
+ def send(self):
+ self._check(pn_messenger_send(self._mng))
+
+ def recv(self, n):
+ self._check(pn_messenger_recv(self._mng, n))
+
+ def get(self, msg):
+ self._check(pn_messenger_get(self._mng, msg._msg))
+
+ @property
+ def outgoing(self):
+ return pn_messenger_outgoing(self._mng)
+
+ @property
+ def incoming(self):
+ return pn_messenger_incoming(self._mng)
+
+class Message(object):
+
+ DATA = PN_DATA
+ TEXT = PN_TEXT
+ AMQP = PN_AMQP
+ JSON = PN_JSON
+
+ def __init__(self):
+ self._msg = pn_message()
+
+ def __del__(self):
+ if hasattr(self, "_msg"):
+ pn_message_free(self._msg)
+ del self._msg
+
+ def _check(self, err):
+ if err < 0:
+ exc = EXCEPTIONS.get(err, MessageException)
+ raise exc("[%s]: %s" % (err, pn_message_error(self._msg)))
+ else:
+ return err
+
+ def clear(self):
+ pn_message_clear(self._msg)
+
+ @property
+ def durable(self):
+ return pn_message_is_durable(self._msg)
+
+ @durable.setter
+ def durable(self, value):
+ self._check(pn_message_set_durable(self._msg, bool(value)))
+
+ @property
+ def priority(self):
+ return pn_message_get_priority(self._msg)
+
+ @priority.setter
+ def priority(self, value):
+ self._check(pn_message_set_priority(self._msg, value))
+
+ @property
+ def ttl(self):
+ return pn_message_get_ttl(self._msg)
+
+ @ttl.setter
+ def ttl(self, value):
+ self._check(pn_message_set_ttl(self._msg, value))
+
+ @property
+ def first_acquirer(self):
+ return pn_message_is_first_acquirer(self._msg)
+
+ @first_acquirer.setter
+ def first_acquirer(self, value):
+ self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
+
+ @property
+ def delivery_count(self):
+ return pn_message_get_delivery_count(self._msg)
+
+ @delivery_count.setter
+ def delivery_count(self, value):
+ self._check(pn_message_set_delivery_count(self._msg, value))
+
+ # XXX
+ @property
+ def id(self):
+ return pn_message_get_id(self._msg)
+
+ @id.setter
+ def id(self, value):
+ self._check(pn_message_set_id(self._msg, value))
+
+ @property
+ def user_id(self):
+ return pn_message_get_user_id(self._msg)
+
+ @user_id.setter
+ def user_id(self, value):
+ self._check(pn_message_set_user_id(self._msg, value))
+
+ @property
+ def address(self):
+ return pn_message_get_address(self._msg)
+
+ @address.setter
+ def address(self, value):
+ self._check(pn_message_set_address(self._msg, value))
+
+ @property
+ def subject(self):
+ return pn_message_get_subject(self._msg)
+
+ @subject.setter
+ def subject(self, value):
+ self._check(pn_message_set_subject(self._msg, value))
+
+ @property
+ def reply_to(self):
+ return pn_message_get_reply_to(self._msg)
+
+ @reply_to.setter
+ def reply_to(self, value):
+ self._check(pn_message_set_reply_to(self._msg, value))
+
+ # XXX
+ @property
+ def correlation_id(self):
+ return pn_message_get_correlation_id(self._msg)
+
+ @correlation_id.setter
+ def correlation_id(self, value):
+ self._check(pn_message_set_correlation_id(self._msg, value))
+
+ @property
+ def content_type(self):
+ return pn_message_get_content_type(self._msg)
+
+ @content_type.setter
+ def content_type(self, value):
+ self._check(pn_message_set_content_type(self._msg, value))
+
+ @property
+ def content_encoding(self):
+ return pn_message_get_content_encoding(self._msg)
+
+ @content_encoding.setter
+ def content_encoding(self, value):
+ self._check(pn_message_set_content_encoding(self._msg, value))
+
+ @property
+ def expiry_time(self):
+ return pn_message_get_expiry_time(self._msg)
+
+ @expiry_time.setter
+ def expiry_time(self, value):
+ self._check(pn_message_set_expiry_time(self._msg, value))
+
+ @property
+ def creation_time(self):
+ return pn_message_get_creation_time(self._msg)
+
+ @creation_time.setter
+ def creation_time(self, value):
+ self._check(pn_message_set_creation_time(self._msg, value))
+
+ @property
+ def group_id(self):
+ return pn_message_get_group_id(self._msg)
+
+ @group_id.setter
+ def group_id(self, value):
+ self._check(pn_message_set_group_id(self._msg, value))
+
+ @property
+ def group_sequence(self):
+ return pn_message_get_group_sequence(self._msg)
+
+ @group_sequence.setter
+ def group_sequence(self, value):
+ self._check(pn_message_set_group_sequence(self._msg, value))
+
+ @property
+ def reply_to_group_id(self):
+ return pn_message_get_reply_to_group_id(self._msg)
+
+ @reply_to_group_id.setter
+ def reply_to_group_id(self, value):
+ self._check(pn_message_set_reply_to_group_id(self._msg, value))
+
+ # XXX
+ @property
+ def format(self):
+ return pn_message_get_format(self._msg)
+
+ @format.setter
+ def format(self, value):
+ self._check(pn_message_set_format(self._msg, value))
+
+ def load(self, data):
+ self._check(pn_message_load(self._msg, data))
+
+ def save(self):
+ sz = 16
+ while True:
+ err, data = pn_message_save(self._msg, sz)
+ if err == PN_OVERFLOW:
+ sz *= 2
+ continue
+ else:
+ self._check(err)
+ return data
+
+__all__ = ["Messenger", "Message", "ProtonException", "MessengerException",
+ "MessageException", "Timeout"]
diff --git a/proton-c/include/proton/engine.h b/proton-c/include/proton/engine.h
index cd80cdf..40f1c73 100644
--- a/proton-c/include/proton/engine.h
+++ b/proton-c/include/proton/engine.h
@@ -264,6 +264,8 @@
void pn_session_open(pn_session_t *session);
void pn_session_close(pn_session_t *session);
void pn_session_free(pn_session_t *session);
+void *pn_session_context(pn_session_t *session);
+void pn_session_set_context(pn_session_t *session, void *context);
// link
const char *pn_link_name(pn_link_t *link);
@@ -291,6 +293,8 @@
void pn_link_open(pn_link_t *sender);
void pn_link_close(pn_link_t *sender);
void pn_link_free(pn_link_t *sender);
+void *pn_link_context(pn_link_t *link);
+void pn_link_set_context(pn_link_t *link, void *context);
// sender
//void pn_offer(pn_sender_t *sender, int credits);
@@ -319,6 +323,8 @@
//int pn_format(pn_delivery_t *delivery);
void pn_settle(pn_delivery_t *delivery);
void pn_delivery_dump(pn_delivery_t *delivery);
+void *pn_delivery_context(pn_delivery_t *delivery);
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context);
#ifdef __cplusplus
}
diff --git a/proton-c/src/driver-internal.h b/proton-c/src/driver-internal.h
index d4d794c..a96f06f 100644
--- a/proton-c/src/driver-internal.h
+++ b/proton-c/src/driver-internal.h
@@ -103,7 +103,10 @@
int pn_connector_poller_init( struct pn_connector_t *);
void pn_connector_poller_destroy( struct pn_connector_t *);
-void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+//void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+void pn_driver_poller_wait_1(pn_driver_t *);
+void pn_driver_poller_wait_2(pn_driver_t *, int);
+void pn_driver_poller_wait_3(pn_driver_t *);
int pn_io_handler(pn_connector_t *);
int pn_null_io_handler(pn_connector_t *);
void pn_connector_process_output(pn_connector_t *);
diff --git a/proton-c/src/driver.c b/proton-c/src/driver.c
index 7ec0258..91e24af 100644
--- a/proton-c/src/driver.c
+++ b/proton-c/src/driver.c
@@ -587,11 +587,36 @@
}
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+// temporary workaround for a multi-threading problem. A multi-threaded
+// application must hold a lock on parts 1 and 3, but not on part 2.
+// This temporary change, which is not reflected in the driver's API, allows
+// a multi-threaded application to use the three parts separately.
+//
+// This workaround will eventually be replaced by a more elegant solution
+// to the problem.
+//
+void pn_driver_wait_1(pn_driver_t *d)
+{
+ pn_driver_poller_wait_1(d);
+}
+
+void pn_driver_wait_2(pn_driver_t *d, int timeout)
+{
+ pn_driver_poller_wait_2(d, timeout);
+}
+
+void pn_driver_wait_3(pn_driver_t *d)
+{
+ pn_driver_poller_wait_3(d);
+}
+
void pn_driver_wait(pn_driver_t *d, int timeout)
{
- pn_driver_poller_wait(d, timeout);
- d->listener_next = d->listener_head;
- d->connector_next = d->connector_head;
+ pn_driver_wait_1(d);
+ pn_driver_wait_2(d, timeout);
+ pn_driver_wait_3(d);
}
pn_listener_t *pn_driver_listener(pn_driver_t *d) {
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 35fbf34..89ceaea 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -138,6 +138,7 @@
size_t link_capacity;
size_t link_count;
size_t id;
+ void *context;
};
struct pn_link_t {
@@ -159,6 +160,7 @@
bool drain;
bool drained; // sender only
size_t id;
+ void *context;
};
struct pn_delivery_t {
@@ -182,6 +184,7 @@
bool tpwork;
pn_buffer_t *bytes;
bool done;
+ void *transport_context;
void *context;
};
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index 22146ff..85e7941 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -296,6 +296,18 @@
free(session);
}
+void *pn_session_context(pn_session_t *session)
+{
+ return session ? session->context : 0;
+}
+
+void pn_session_set_context(pn_session_t *session, void *context)
+{
+ if (session)
+ session->context = context;
+}
+
+
void pn_add_link(pn_session_t *ssn, pn_link_t *link)
{
PN_ENSURE(ssn->links, ssn->link_capacity, ssn->link_count + 1);
@@ -361,6 +373,17 @@
free(link);
}
+void *pn_link_context(pn_link_t *link)
+{
+ return link ? link->context : 0;
+}
+
+void pn_link_set_context(pn_link_t *link, void *context)
+{
+ if (link)
+ link->context = context;
+}
+
void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
{
endpoint->type = type;
@@ -641,6 +664,7 @@
ssn->links = NULL;
ssn->link_capacity = 0;
ssn->link_count = 0;
+ ssn->context = 0;
return ssn;
}
@@ -793,6 +817,7 @@
link->queued = 0;
link->drain = false;
link->drained = false;
+ link->context = 0;
}
const char *pn_source(pn_link_t *link)
@@ -933,6 +958,7 @@
delivery->tpwork = false;
pn_buffer_clear(delivery->bytes);
delivery->done = false;
+ delivery->transport_context = NULL;
delivery->context = NULL;
if (!link->current)
@@ -979,6 +1005,17 @@
pn_readable(d), d->work);
}
+void *pn_delivery_context(pn_delivery_t *delivery)
+{
+ return delivery ? delivery->context : NULL;
+}
+
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
+{
+ if (delivery)
+ delivery->context = context;
+}
+
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
if (delivery) {
@@ -1052,8 +1089,8 @@
void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
{
- pn_delivery_state_t *state = delivery->context;
- delivery->context = NULL;
+ pn_delivery_state_t *state = delivery->transport_context;
+ delivery->transport_context = NULL;
if (state) state->delivery = NULL;
pn_real_settle(delivery);
if (state) pn_delivery_buffer_gc(db);
@@ -1247,7 +1284,7 @@
delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
- delivery->context = state;
+ delivery->transport_context = state;
if (id != state->id) {
int err = pn_do_error(transport, "amqp:session:invalid-field",
"sequencing error, expected delivery-id %u, got %u",
@@ -1540,7 +1577,7 @@
{
if (delivery->settled) return false;
if (pn_is_sender(delivery->link)) {
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
if (state) {
return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
} else {
@@ -1656,7 +1693,7 @@
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
// XXX: check for null state
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
uint64_t code;
switch(delivery->local_state) {
case PN_ACCEPTED:
@@ -1684,10 +1721,10 @@
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
- delivery->context = state;
+ delivery->transport_context = state;
}
if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
@@ -1712,7 +1749,7 @@
}
}
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
// XXX: need to prevent duplicate disposition sending
if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
&& state && state->sent) {
@@ -1732,7 +1769,7 @@
pn_link_t *link = delivery->link;
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
- if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->context) {
+ if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->transport_context) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
diff --git a/proton-c/src/message/message.c b/proton-c/src/message/message.c
index df9e4f1..ee52fd2 100644
--- a/proton-c/src/message/message.c
+++ b/proton-c/src/message/message.c
@@ -445,7 +445,7 @@
{
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
- err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSiS]", &user_id, &address,
+ err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSIS]", &user_id, &address,
&subject, &reply_to, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
diff --git a/proton-c/src/pollers/poll.c b/proton-c/src/pollers/poll.c
index bb9815a..e9b2b3e 100644
--- a/proton-c/src/pollers/poll.c
+++ b/proton-c/src/pollers/poll.c
@@ -96,7 +96,7 @@
c->poller = NULL;
}
-
+#if 0 // save for now
void pn_driver_poller_wait(pn_driver_t *d, int timeout)
{
pn_driver_poller_t *poller = d->poller;
@@ -166,3 +166,102 @@
c = c->connector_next;
}
}
+#endif
+
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+// temporary workaround for a multi-threading problem. A multi-threaded
+// application must hold a lock on parts 1 and 3, but not on part 2.
+// This temporary change, which is not reflected in the driver's API, allows
+// a multi-threaded application to use the three parts separately.
+//
+// This workaround will eventually be replaced by a more elegant solution
+// to the problem.
+//
+
+static void pn_driver_poller_rebuild(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+ size_t size = d->listener_count + d->connector_count;
+ while (poller->capacity < size + 1) {
+ poller->capacity = poller->capacity ? 2*poller->capacity : 16;
+ poller->fds = realloc(poller->fds, poller->capacity*sizeof(struct pollfd));
+ }
+
+ poller->nfds = 0;
+
+ poller->fds[poller->nfds].fd = d->ctrl[0];
+ poller->fds[poller->nfds].events = POLLIN;
+ poller->fds[poller->nfds].revents = 0;
+ poller->nfds++;
+
+ pn_listener_t *l = d->listener_head;
+ for (int i = 0; i < d->listener_count; i++) {
+ poller->fds[poller->nfds].fd = l->fd;
+ poller->fds[poller->nfds].events = POLLIN;
+ poller->fds[poller->nfds].revents = 0;
+ l->poller->idx = poller->nfds;
+ poller->nfds++;
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ for (int i = 0; i < d->connector_count; i++)
+ {
+ if (!c->closed) {
+ poller->fds[poller->nfds].fd = c->fd;
+ poller->fds[poller->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
+ (c->status & PN_SEL_WR ? POLLOUT : 0);
+ poller->fds[poller->nfds].revents = 0;
+ c->poller->idx = poller->nfds;
+ poller->nfds++;
+ }
+ c = c->connector_next;
+ }
+}
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+ pn_driver_poller_rebuild(d);
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+ pn_driver_poller_t *poller = d->poller;
+ DIE_IFE(poll(poller->fds, poller->nfds, d->closed_count > 0 ? 0 : timeout));
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ if (poller->fds[0].revents & POLLIN) {
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
+
+ pn_listener_t *l = d->listener_head;
+ while (l) {
+ int idx = l->poller->idx;
+ l->pending = (idx && poller->fds[idx].revents & POLLIN);
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ while (c) {
+ if (c->closed) {
+ c->pending_read = false;
+ c->pending_write = false;
+ c->pending_tick = false;
+ } else {
+ int idx = c->poller->idx;
+ c->pending_read = (idx && poller->fds[idx].revents & POLLIN);
+ c->pending_write = (idx && poller->fds[idx].revents & POLLOUT);
+ }
+ c = c->connector_next;
+ }
+
+ d->listener_next = d->listener_head;
+ d->connector_next = d->connector_head;
+}
diff --git a/proton-c/src/pollers/select.c b/proton-c/src/pollers/select.c
index ef9d0d1..dd06a89 100644
--- a/proton-c/src/pollers/select.c
+++ b/proton-c/src/pollers/select.c
@@ -78,7 +78,7 @@
{
}
-
+#if 0 // save it for now
void pn_driver_poller_wait(pn_driver_t *d, int timeout)
{
pn_driver_poller_t *poller = d->poller;
@@ -147,3 +147,83 @@
}
}
}
+#endif
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ // setup the select
+ FD_ZERO(&poller->readfds);
+ FD_ZERO(&poller->writefds);
+
+ FD_SET(d->ctrl[0], &poller->readfds);
+ poller->max_fds = d->ctrl[0];
+
+ pn_listener_t *l = d->listener_head;
+ for (int i = 0; i < d->listener_count; i++) {
+ FD_SET(l->fd, &poller->readfds);
+ if (l->fd > poller->max_fds) poller->max_fds = l->fd;
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ for (int i = 0; i < d->connector_count; i++) {
+ if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
+ if (c->status & PN_SEL_RD)
+ FD_SET(c->fd, &poller->readfds);
+ if (c->status & PN_SEL_WR)
+ FD_SET(c->fd, &poller->writefds);
+ if (c->fd > poller->max_fds) poller->max_fds = c->fd;
+ }
+ c = c->connector_next;
+ }
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ struct timeval to = {0};
+ if (timeout > 0) {
+ // convert millisecs to sec and usec:
+ to.tv_sec = timeout/1000;
+ to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
+ }
+
+ int nfds = select(poller->max_fds + 1, &poller->readfds, &poller->writefds, NULL, timeout < 0 ? NULL : &to);
+ DIE_IFE(nfds);
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ if (FD_ISSET(d->ctrl[0], &poller->readfds)) {
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
+
+ pn_listener_t *l = d->listener_head;
+ while (l) {
+ l->pending = FD_ISSET(l->fd, &poller->readfds);
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ while (c) {
+ if (c->closed) {
+ c->pending_read = false;
+ c->pending_write = false;
+ c->pending_tick = false;
+ } else {
+ c->pending_read = FD_ISSET(c->fd, &poller->readfds);
+ c->pending_write = FD_ISSET(c->fd, &poller->writefds);
+ }
+ c = c->connector_next;
+ }
+
+ d->listener_next = d->listener_head;
+ d->connector_next = d->connector_head;
+}