PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 21226a9..0d63f90 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -352,7 +352,7 @@
}
bool schedule(task_t *tsk);
-void notify_poller(task_t *tsk);
+void notify_poller(pn_proactor_t *p);
void schedule_done(task_t *tsk);
void psocket_init(psocket_t* ps, epoll_type_t type);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index ae0c37b..c4f028d 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -314,8 +314,7 @@
}
// part2: unblock epoll_wait(). Make OS call without lock held.
-void notify_poller(task_t *tsk) {
- pn_proactor_t *p = tsk->proactor;
+void notify_poller(pn_proactor_t *p) {
if (p->eventfd == -1)
return;
rearm(p, &p->epoll_schedule);
@@ -706,7 +705,7 @@
}
else notify = schedule(&l->task);
unlock(&l->task.mutex);
- if (notify) notify_poller(&l->task);
+ if (notify) notify_poller(p);
a = acceptor_list_next(&ovflw);
}
}
@@ -871,7 +870,7 @@
}
unlock(&pc->task.mutex);
if (notify)
- notify_poller(&pc->task);
+ notify_poller(pc->task.proactor);
}
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
@@ -1018,7 +1017,7 @@
notify = unassign_thread(ts, UNUSED);
unlock(&p->sched_mutex);
if (notify)
- notify_poller(&p->task);
+ notify_poller(p);
return;
}
}
@@ -1033,7 +1032,7 @@
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
return;
}
@@ -1399,7 +1398,6 @@
pn_connection_open(pc->driver.connection); /* Auto-open */
bool notify = false;
- bool notify_proactor = false;
if (pc->disconnected) {
notify = schedule(&pc->task); /* Error during initialization */
@@ -1414,14 +1412,13 @@
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
notify = schedule(&pc->task);
lock(&p->task.mutex);
- notify_proactor = schedule_if_inactive(p);
+ notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
}
}
/* We need to issue INACTIVE on immediate failure */
unlock(&pc->task.mutex);
- if (notify) notify_poller(&pc->task);
- if (notify_proactor) notify_poller(&p->task);
+ if (notify) notify_poller(pc->task.proactor);
}
static void pconnection_tick(pconnection_t *pc) {
@@ -1449,7 +1446,7 @@
}
unlock(&pc->task.mutex);
}
- if (notify) notify_poller(&pc->task);
+ if (notify) notify_poller(pc->task.proactor);
}
void pn_proactor_release_connection(pn_connection_t *c) {
@@ -1463,7 +1460,7 @@
notify = schedule(&pc->task);
unlock(&pc->task.mutex);
}
- if (notify) notify_poller(&pc->task);
+ if (notify) notify_poller(pc->task.proactor);
}
// ========================================================================
@@ -1576,7 +1573,7 @@
}
proactor_add(&l->task);
unlock(&l->task.mutex);
- if (notify) notify_poller(&l->task);
+ if (notify) notify_poller(p);
return;
}
@@ -1656,7 +1653,7 @@
notify = schedule(&l->task);
}
unlock(&l->task.mutex);
- if (notify) notify_poller(&l->task);
+ if (notify) notify_poller(l->task.proactor);
}
static void listener_forced_shutdown(pn_listener_t *l) {
@@ -1814,7 +1811,7 @@
notify = unassign_thread(ts, UNUSED);
unlock(&p->sched_mutex);
if (notify)
- notify_poller(&p->task);
+ notify_poller(p);
return;
} else if (n_events || listener_has_event(l))
notify = schedule(&l->task);
@@ -1824,7 +1821,7 @@
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
- if (notify) notify_poller(&l->task);
+ if (notify) notify_poller(p);
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -1885,7 +1882,7 @@
notify = schedule(&l->task);
unlock(&pc->task.mutex);
unlock(&l->task.mutex);
- if (notify) notify_poller(&l->task);
+ if (notify) notify_poller(l->task.proactor);
}
@@ -2154,7 +2151,7 @@
}
bool notify = schedule_if_inactive(p);
unlock(&p->task.mutex);
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
return can_free;
}
@@ -2432,7 +2429,7 @@
bool notify = unassign_thread(ts, PROCESSING);
if (notify) {
unlock(&p->sched_mutex);
- notify_poller(&p->task);
+ notify_poller(p);
lock(&p->sched_mutex);
}
continue; // Long time may have passed. Back to beginning.
@@ -2725,7 +2722,7 @@
unlock(&p->sched_mutex);
if (notify)
- notify_poller(&p->task);
+ notify_poller(p);
check_earmark_override(p, ts);
return;
}
@@ -2751,7 +2748,7 @@
pni_timer_set(p->timer, t + pn_proactor_now_64());
}
unlock(&p->task.mutex);
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
}
void pn_proactor_cancel_timeout(pn_proactor_t *p) {
@@ -2761,7 +2758,7 @@
pni_timer_set(p->timer, 0);
bool notify = schedule_if_inactive(p);
unlock(&p->task.mutex);
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
}
void pni_proactor_timeout(pn_proactor_t *p) {
@@ -2772,7 +2769,7 @@
notify = schedule(&p->task);
}
unlock(&p->task.mutex);
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -2799,7 +2796,7 @@
notify = schedule_if_inactive(p);
unlock(&p->task.mutex);
if (!disconnecting_tasks) {
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
return;
}
@@ -2859,7 +2856,7 @@
if (tsk_notify)
tsk_notify = schedule(tsk);
if (tsk_notify)
- notify_poller(tsk);
+ notify_poller(p);
}
unlock(&p->task.mutex);
unlock(tsk_mutex);
@@ -2871,7 +2868,7 @@
}
}
if (notify)
- notify_poller(&p->task);
+ notify_poller(p);
}
const pn_netaddr_t *pn_transport_local_addr(pn_transport_t *t) {
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 0195c57..3fd2b36 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -185,7 +185,6 @@
proactor_add(&prc->task);
bool notify = false;
- bool notify_proactor = false;
const char *host;
const char *port;
@@ -203,14 +202,13 @@
prc->disconnected = true;
notify = schedule(&prc->task);
lock(&p->task.mutex);
- notify_proactor = schedule_if_inactive(p);
+ notify |= schedule_if_inactive(p);
unlock(&p->task.mutex);
}
/* We need to issue INACTIVE on immediate failure */
unlock(&prc->task.mutex);
- if (notify) notify_poller(&prc->task);
- if (notify_proactor) notify_poller(&p->task);
+ if (notify) notify_poller(p);
}
void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {
@@ -250,7 +248,7 @@
}
unlock(&prc->task.mutex);
unlock(&l->task.mutex);
- if (notify) notify_poller(&l->task);
+ if (notify) notify_poller(l->task.proactor);
}
const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) {
@@ -274,7 +272,7 @@
notify = schedule(&prc->task);
}
unlock(&prc->task.mutex);
- if (notify) notify_poller(&prc->task);
+ if (notify) notify_poller(prc->task.proactor);
}
void pn_raw_connection_close(pn_raw_connection_t *rc) {
@@ -360,18 +358,17 @@
}
void pni_raw_connection_done(praw_connection_t *rc) {
- bool self_notify = false;
+ bool notify = false;
bool ready = false;
lock(&rc->task.mutex);
pn_proactor_t *p = rc->task.proactor;
tslot_t *ts = rc->task.runner;
rc->task.working = false;
- self_notify = rc->waking && schedule(&rc->task);
+ notify = rc->waking && schedule(&rc->task);
// The task may be in the ready state even if we've got no raw connection
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
ready = rc->task.ready;
unlock(&rc->task.mutex);
- if (self_notify) notify_poller(&rc->task);
pn_raw_connection_t *raw = &rc->raw_connection;
int wanted =
@@ -390,7 +387,7 @@
}
lock(&p->sched_mutex);
- bool notify = unassign_thread(ts, UNUSED);
+ notify |= unassign_thread(ts, UNUSED);
unlock(&p->sched_mutex);
- if (notify) notify_poller(&p->task);
+ if (notify) notify_poller(p);
}
diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c
index a882128..26de7d2 100644
--- a/c/src/proactor/epoll_timer.c
+++ b/c/src/proactor/epoll_timer.c
@@ -282,7 +282,7 @@
unlock(&tm->task.mutex);
if (notify)
- notify_poller(&tm->task);
+ notify_poller(tm->task.proactor);
}
pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready) {
@@ -353,7 +353,7 @@
unlock(&tm->task.mutex);
if (notify)
- notify_poller(&tm->task);
+ notify_poller(tm->task.proactor);
// The timer_manager never has events to batch.
return NULL;
// TODO: perhaps become task of one of the timed out timers (if otherwise idle) and process() that task.