NO-JIRA: Split out epoll proactor poller logic to separate routine for readability
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 01d9db8..4c00d47 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -101,7 +101,7 @@
// Maybe futex is even better?
// See other "TODO" in code.
//
-// Consider case of large number of wakes: proactor_do_epoll() could start by
+// Consider case of large number of wakes: next_event_batch() could start by
// looking for pending wakes before a kernel call to epoll_wait(), or there
// could be several eventfds with random assignment of wakeables.
@@ -691,6 +691,7 @@
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup);
static void write_flush(pconnection_t *pc);
static void listener_begin_close(pn_listener_t* l);
+static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block);
static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
@@ -2580,7 +2581,7 @@
return NULL;
}
-static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
+static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) {
lock(&p->tslot_mutex);
tslot_t * ts = find_tslot(p);
unlock(&p->tslot_mutex);
@@ -2591,9 +2592,9 @@
assert(ts->state == UNUSED || ts->state == NEW);
ts->state = PROCESSING;
+ // Process outstanding epoll events until we get a batch or need to block.
while (true) {
- // Process outstanding epoll events until we get a batch or need to block.
-
+ // First see if there are any contexts waiting to run and perhaps generate new Proton events,
pcontext_t *ctx = next_runnable(p, ts);
if (ctx) {
ts->state = BATCHING;
@@ -2611,146 +2612,22 @@
continue; // Long time may have passed. Back to beginning.
}
- // poll or wait for a runnable context
+ // Poll or wait for a runnable context
if (p->poller == NULL) {
+ bool return_immediately;
p->poller = ts;
- // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls.
- assert(p->n_runnables == 0);
- if (p->thread_count > p->thread_capacity)
- grow_poller_bufs(p);
- p->next_runnable = 0;
- p->n_warm_runnables = 0;
- p->last_earmark = NULL;
-
- bool unfinished_earmarks = p->earmark_count > 0;
- bool new_wakes = false;
- bool epoll_immediate = unfinished_earmarks || !can_block;
- assert(!p->sched_wake_first);
- if (!epoll_immediate) {
- lock(&p->eventfd_mutex);
- if (p->wake_list_first) {
- epoll_immediate = true;
- new_wakes = true;
- } else {
- p->wakes_in_progress = false;
- }
- unlock(&p->eventfd_mutex);
+ // Get new epoll events (if any) and mark the relevant contexts as runnable
+ return_immediately = poller_do_epoll(p, ts, can_block);
+ p->poller = NULL;
+ if (return_immediately) {
+ // Check if another thread is available to continue epoll-ing.
+ tslot_t *res_ts = resume_one_thread(p);
+ ts->state = UNUSED;
+ unlock(&p->sched_mutex);
+ if (res_ts) resume(p, res_ts);
+ return NULL;
}
- int timeout = (epoll_immediate) ? 0 : -1;
- p->poller_suspended = (timeout == -1);
- unlock(&p->sched_mutex);
-
- int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
-
- lock(&p->sched_mutex);
- p->poller_suspended = false;
-
- bool unpolled_work = false;
- if (p->earmark_count > 0) {
- p->earmark_drain = true;
- unpolled_work = true;
- }
- if (new_wakes) {
- lock(&p->eventfd_mutex);
- schedule_wake_list(p);
- unlock(&p->eventfd_mutex);
- unpolled_work = true;
- }
-
- if (n < 0) {
- if (errno != EINTR)
- perror("epoll_wait"); // TODO: proper log
- if (!can_block && !unpolled_work) {
- p->poller = NULL;
- tslot_t *res_ts = resume_one_thread(p);
- ts->state = UNUSED;
- unlock(&p->sched_mutex);
- if (res_ts) resume(p, res_ts);
- return NULL;
- }
- else {
- p->poller = NULL;
- continue;
- }
- } else if (n == 0) {
- if (!can_block && !unpolled_work) {
- p->poller = NULL;
- tslot_t *res_ts = resume_one_thread(p);
- ts->state = UNUSED;
- unlock(&p->sched_mutex);
- if (res_ts) resume(p, res_ts);
- return NULL;
- }
- else {
- if (!epoll_immediate)
- perror("epoll_wait unexpected timeout"); // TODO: proper log
- if (!unpolled_work) {
- p->poller = NULL;
- continue;
- }
- }
- }
-
- for (int i = 0; i < n; i++) {
- ctx = post_event(p, &p->kevents[i]);
- if (ctx)
- make_runnable(ctx);
- }
- if (n > 0)
- memset(p->kevents, 0, sizeof(struct epoll_event) * n);
-
- // The list of pending wakes can be very long. Traverse part of it looking for warm pairings.
- pcontext_t *wctx = p->sched_wake_current;
- int max_runnables = p->runnables_capacity;
- while (wctx && p->n_runnables < max_runnables) {
- if (wctx->runner == REWAKE_PLACEHOLDER)
- wctx->runner = NULL; // Allow context to run again.
- ctx = post_wake(p, wctx);
- if (ctx)
- make_runnable(ctx);
- pop_wake(wctx);
- wctx = wctx->wake_next;
- }
- p->sched_wake_current = wctx;
- // More wakes than places on the runnables list
- while (wctx) {
- if (wctx->runner == REWAKE_PLACEHOLDER)
- wctx->runner = NULL; // Allow context to run again.
- wctx->sched_wake = true;
- wctx->sched_pending = true;
- if (wctx->runnable || wctx->runner)
- pop_wake(wctx);
- wctx = wctx->wake_next;
- }
-
- if (pni_immediate && !ts->context) {
- // Poller gets to run if possible
- pcontext_t *pctx;
- if (p->n_runnables) {
- assert(p->next_runnable == 0);
- pctx = p->runnables[0];
- if (++p->next_runnable == p->n_runnables)
- p->n_runnables = 0;
- } else if (p->n_warm_runnables) {
- pctx = p->warm_runnables[--p->n_warm_runnables];
- tslot_t *ts2 = pctx->runner;
- ts2->prev_context = ts2->context = NULL;
- pctx->runner = NULL;
- } else if (p->last_earmark) {
- pctx = p->last_earmark->context;
- remove_earmark(p->last_earmark);
- if (p->earmark_count == 0)
- p->earmark_drain = false;
- } else {
- pctx = NULL;
- }
- if (pctx) {
- assign_thread(ts, pctx);
- }
- }
-
poller_done(p, ts); // put suspended threads to work.
- // p->poller has been released, so a new poller may already be running.
} else if (!can_block) {
ts->state = UNUSED;
unlock(&p->sched_mutex);
@@ -2762,6 +2639,139 @@
} // while
}
+// Call with sched lock. Return true if !can_block and no new events to process.
+static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) {
+ // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls.
+ int n_events;
+ pcontext_t *ctx;
+
+ while (true) {
+ assert(p->n_runnables == 0);
+ if (p->thread_count > p->thread_capacity)
+ grow_poller_bufs(p);
+ p->next_runnable = 0;
+ p->n_warm_runnables = 0;
+ p->last_earmark = NULL;
+
+ bool unfinished_earmarks = p->earmark_count > 0;
+ bool new_wakes = false;
+ bool epoll_immediate = unfinished_earmarks || !can_block;
+ assert(!p->sched_wake_first);
+ if (!epoll_immediate) {
+ lock(&p->eventfd_mutex);
+ if (p->wake_list_first) {
+ epoll_immediate = true;
+ new_wakes = true;
+ } else {
+ p->wakes_in_progress = false;
+ }
+ unlock(&p->eventfd_mutex);
+ }
+ int timeout = (epoll_immediate) ? 0 : -1;
+ p->poller_suspended = (timeout == -1);
+ unlock(&p->sched_mutex);
+
+ n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
+
+ lock(&p->sched_mutex);
+ p->poller_suspended = false;
+
+ bool unpolled_work = false;
+ if (p->earmark_count > 0) {
+ p->earmark_drain = true;
+ unpolled_work = true;
+ }
+ if (new_wakes) {
+ lock(&p->eventfd_mutex);
+ schedule_wake_list(p);
+ unlock(&p->eventfd_mutex);
+ unpolled_work = true;
+ }
+
+ if (n_events < 0) {
+ if (errno != EINTR)
+ perror("epoll_wait"); // TODO: proper log
+ if (!can_block && !unpolled_work)
+ return true;
+ else
+ continue;
+ } else if (n_events == 0) {
+ if (!can_block && !unpolled_work)
+ return true;
+ else {
+ if (!epoll_immediate)
+ perror("epoll_wait unexpected timeout"); // TODO: proper log
+ if (!unpolled_work)
+ continue;
+ }
+ }
+
+ break;
+ }
+
+ // We have unpolled work or at least one new epoll event
+
+
+ for (int i = 0; i < n_events; i++) {
+ ctx = post_event(p, &p->kevents[i]);
+ if (ctx)
+ make_runnable(ctx);
+ }
+ if (n_events > 0)
+ memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);
+
+ // The list of pending wakes can be very long. Traverse part of it looking for warm pairings.
+ pcontext_t *wctx = p->sched_wake_current;
+ int max_runnables = p->runnables_capacity;
+ while (wctx && p->n_runnables < max_runnables) {
+ if (wctx->runner == REWAKE_PLACEHOLDER)
+ wctx->runner = NULL; // Allow context to run again.
+ ctx = post_wake(p, wctx);
+ if (ctx)
+ make_runnable(ctx);
+ pop_wake(wctx);
+ wctx = wctx->wake_next;
+ }
+ p->sched_wake_current = wctx;
+ // More wakes than places on the runnables list
+ while (wctx) {
+ if (wctx->runner == REWAKE_PLACEHOLDER)
+ wctx->runner = NULL; // Allow context to run again.
+ wctx->sched_wake = true;
+ wctx->sched_pending = true;
+ if (wctx->runnable || wctx->runner)
+ pop_wake(wctx);
+ wctx = wctx->wake_next;
+ }
+
+ if (pni_immediate && !ts->context) {
+ // Poller gets to run if possible
+ pcontext_t *pctx;
+ if (p->n_runnables) {
+ assert(p->next_runnable == 0);
+ pctx = p->runnables[0];
+ if (++p->next_runnable == p->n_runnables)
+ p->n_runnables = 0;
+ } else if (p->n_warm_runnables) {
+ pctx = p->warm_runnables[--p->n_warm_runnables];
+ tslot_t *ts2 = pctx->runner;
+ ts2->prev_context = ts2->context = NULL;
+ pctx->runner = NULL;
+ } else if (p->last_earmark) {
+ pctx = p->last_earmark->context;
+ remove_earmark(p->last_earmark);
+ if (p->earmark_count == 0)
+ p->earmark_drain = false;
+ } else {
+ pctx = NULL;
+ }
+ if (pctx) {
+ assign_thread(ts, pctx);
+ }
+ }
+ return false;
+}
+
// Call with sched lock, but only from poller context.
static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
// Create a list of available threads to put to work.
@@ -2819,11 +2829,11 @@
}
pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
- return proactor_do_epoll(p, true);
+ return next_event_batch(p, true);
}
pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
- return proactor_do_epoll(p, false);
+ return next_event_batch(p, false);
}
// Call with no locks