PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 0d63f90..e8c9f0a 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -86,8 +86,9 @@
pn_proactor_t *proactor; /* Immutable */
task_type_t type;
bool working;
- bool on_ready_list;
bool ready; // ready to run and on ready list. Poller notified by eventfd.
+ bool waking;
+ bool on_ready_list; // todo: protected by eventfd_mutex or sched mutex? needed?
struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
bool closing;
// Next 4 are protected by the proactor mutex
@@ -223,7 +224,6 @@
pni_timer_t *timer;
const char *host, *port;
uint32_t new_events;
- int wake_count; // TODO: protected by task.mutex so should be moved in there (also really bool)
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
@@ -382,6 +382,28 @@
void pni_pconnection_timeout(pconnection_t *pc);
void pni_proactor_timeout(pn_proactor_t *p);
+// Generic wake primitives for a task.
+
+// Call with task lock held. Must call notify_poller() if returns true.
+static inline bool pni_task_wake(task_t *tsk) {
+ if (!tsk->waking) {
+ tsk->waking = true;
+ return schedule(tsk);
+ }
+ return false;
+}
+
+// Call with task lock held.
+static inline bool pni_task_wake_pending(task_t *tsk) {
+ return tsk->waking;
+}
+
+// Call with task lock held and only from the running task.
+static inline void pni_task_wake_done(task_t *tsk) {
+ tsk->waking = false;
+}
+
+
#ifdef __cplusplus
}
#endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index c4f028d..7467683 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -49,6 +49,7 @@
TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
transitions from "private to the scheduler" to "visible to the task".
+ TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
*/
@@ -742,7 +743,6 @@
psocket_init(&pc->psocket, PCONNECTION_IO);
pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port);
pc->new_events = 0;
- pc->wake_count = 0;
pc->tick_pending = false;
pc->queued_disconnect = false;
pc->disconnect_condition = NULL;
@@ -980,7 +980,7 @@
/* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
static inline bool pconnection_work_pending(pconnection_t *pc) {
- if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
+ if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
return true;
if (!pc->read_blocked && !pconnection_rclosed(pc))
return true;
@@ -1153,9 +1153,9 @@
return &pc->batch;
}
bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc);
- if (pc->wake_count) {
+ if (pni_task_wake_pending(&pc->task)) {
waking = !closed;
- pc->wake_count = 0;
+ pni_task_wake_done(&pc->task);
}
if (pc->tick_pending) {
pc->tick_pending = false;
@@ -1441,8 +1441,7 @@
if (pc) {
lock(&pc->task.mutex);
if (!pc->task.closing) {
- pc->wake_count++;
- notify = schedule(&pc->task);
+ notify = pni_task_wake(&pc->task);
}
unlock(&pc->task.mutex);
}
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 3fd2b36..8722ff0 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,7 +50,6 @@
struct addrinfo *ai; /* Current connect address */
bool connected;
bool disconnected;
- bool waking; // TODO: This is actually protected by task.mutex so should be moved into task (pconnection too)
};
static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -144,7 +143,6 @@
prc->connected = false;
prc->disconnected = false;
- prc->waking = false;
prc->batch.next_event = pni_raw_batch_next;
pmutex_init(&prc->rearm_mutex);
@@ -268,8 +266,7 @@
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
lock(&prc->task.mutex);
if (!prc->task.closing) {
- prc->waking = true;
- notify = schedule(&prc->task);
+ notify = pni_task_wake(&prc->task);
}
unlock(&prc->task.mutex);
if (notify) notify_poller(prc->task.proactor);
@@ -290,8 +287,10 @@
// Check wake status every event processed
bool waking = false;
lock(&rc->task.mutex);
- waking = rc->waking;
- rc->waking = false;
+ if (pni_task_wake_pending(&rc->task)) {
+ waking = true;
+ pni_task_wake_done(&rc->task);
+ }
unlock(&rc->task.mutex);
if (waking) pni_raw_wake(raw);
@@ -346,8 +345,10 @@
t->working = true;
if (sched_ready) {
schedule_done(t);
- wake = rc->waking;
- rc->waking = false;
+ if (pni_task_wake_pending(&rc->task)) {
+ wake = true;
+ pni_task_wake_done(&rc->task);
+ }
}
unlock(&t->mutex);
@@ -364,7 +365,7 @@
pn_proactor_t *p = rc->task.proactor;
tslot_t *ts = rc->task.runner;
rc->task.working = false;
- notify = rc->waking && schedule(&rc->task);
+ notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task);
// The task may be in the ready state even if we've got no raw connection
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
ready = rc->task.ready;