PROTON-2130: epoll proactor race/deadlock fixes
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 21c611f..9390595 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -87,6 +87,7 @@
#include <sys/eventfd.h>
#include <limits.h>
#include <time.h>
+#include <alloca.h>
#include "./netaddr-internal.h" /* Include after socket/inet headers */
@@ -940,6 +941,8 @@
static void listener_begin_close(pn_listener_t* l);
static void proactor_add(pcontext_t *ctx);
static bool proactor_remove(pcontext_t *ctx);
+static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
+
static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
return ps->listener ? NULL : (pconnection_t*)ps;
@@ -2928,14 +2931,36 @@
}
}
- // Create a list of available threads to put to work.
+ poller_done(p, ts); // put suspended threads to work.
+ // p->poller has been released, so a new poller may already be running.
+ } else if (!can_block) {
+ ts->state = UNUSED;
+ unlock(&p->sched_mutex);
+ return NULL;
+ } else {
+ // TODO: loop while !poller_suspended, since new work coming
+ suspend(p, ts);
+ }
+ } // while
+}
- int resume_list_count = 0;
+// Call with sched lock, but only from poller context.
+static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
+ // Create a list of available threads to put to work.
+ // ts is the poller thread
+ int resume_list_count = 0;
+ tslot_t **resume_list2 = NULL;
+
+ if (p->suspend_list_count) {
+ int max_resumes = p->n_warm_runnables + p->n_runnables;
+ max_resumes = pn_min(max_resumes, p->suspend_list_count);
+ if (max_resumes) {
+ resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *));
for (int i = 0; i < p->n_warm_runnables ; i++) {
- ctx = p->warm_runnables[i];
+ pcontext_t *ctx = p->warm_runnables[i];
tslot_t *tsp = ctx->runner;
if (tsp->state == SUSPENDED) {
- p->resume_list[resume_list_count++] = tsp;
+ resume_list2[resume_list_count++] = tsp;
LL_REMOVE(p, suspend_list, tsp);
p->suspend_list_count--;
tsp->state = PROCESSING;
@@ -2956,33 +2981,24 @@
for (int i = 0; i < new_runners; i++) {
tslot_t *tsp = p->suspend_list_head;
assert(tsp);
- p->resume_list[resume_list_count++] = tsp;
+ resume_list2[resume_list_count++] = tsp;
LL_REMOVE(p, suspend_list, tsp);
p->suspend_list_count--;
tsp->state = PROCESSING;
}
-
- p->poller = NULL;
- // New poller may run concurrently. Touch only this thread's stack for rest of block.
-
- if (resume_list_count) {
- unlock(&p->sched_mutex);
- for (int i = 0; i < resume_list_count; i++) {
- resume(p, p->resume_list[i]);
- }
- lock(&p->sched_mutex);
- }
- } else if (!can_block) {
- ts->state = UNUSED;
- unlock(&p->sched_mutex);
- return NULL;
- } else {
- // TODO: loop while !poller_suspended, since new work coming
- suspend(p, ts);
}
- } // while
-}
+ }
+ p->poller = NULL;
+ if (resume_list_count) {
+ // Allows a new poller to run concurrently. Touch only stack vars.
+ unlock(&p->sched_mutex);
+ for (int i = 0; i < resume_list_count; i++) {
+ resume(p, resume_list2[i]);
+ }
+ lock(&p->sched_mutex);
+ }
+}
pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
return proactor_do_epoll(p, true);
@@ -3042,12 +3058,12 @@
if (wake(&p->context))
notify = true;
+ unlock(&p->context.mutex);
lock(&p->sched_mutex);
tslot_t *ts = p->context.runner;
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
- unlock(&p->context.mutex);
if (notify)
wake_notify(&p->context);