PROTON-2130: epoll proactor race/deadlock fixes
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 21c611f..9390595 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -87,6 +87,7 @@
 #include <sys/eventfd.h>
 #include <limits.h>
 #include <time.h>
+#include <alloca.h>
 
 #include "./netaddr-internal.h" /* Include after socket/inet headers */
 
@@ -940,6 +941,8 @@
 static void listener_begin_close(pn_listener_t* l);
 static void proactor_add(pcontext_t *ctx);
 static bool proactor_remove(pcontext_t *ctx);
+static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
+
 
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
   return ps->listener ? NULL : (pconnection_t*)ps;
@@ -2928,14 +2931,36 @@
         }
       }
 
-      // Create a list of available threads to put to work.
+      poller_done(p, ts);  // put suspended threads to work.
+      // p->poller has been released, so a new poller may already be running.
+    } else if (!can_block) {
+      ts->state = UNUSED;
+      unlock(&p->sched_mutex);
+      return NULL;
+    } else {
+      // TODO: loop while !poller_suspended, since new work coming
+      suspend(p, ts);
+    }
+  } // while
+}
 
-      int resume_list_count = 0;
+// Call with sched lock, but only from poller context.
+static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
+  // Create a list of available threads to put to work.
+  // ts is the poller thread
+  int resume_list_count = 0;
+  tslot_t **resume_list2 = NULL;
+
+  if (p->suspend_list_count) {
+    int max_resumes = p->n_warm_runnables + p->n_runnables;
+    max_resumes = pn_min(max_resumes, p->suspend_list_count);
+    if (max_resumes) {
+      resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *));
       for (int i = 0; i < p->n_warm_runnables ; i++) {
-        ctx = p->warm_runnables[i];
+        pcontext_t *ctx = p->warm_runnables[i];
         tslot_t *tsp = ctx->runner;
         if (tsp->state == SUSPENDED) {
-          p->resume_list[resume_list_count++] = tsp;
+          resume_list2[resume_list_count++] = tsp;
           LL_REMOVE(p, suspend_list, tsp);
           p->suspend_list_count--;
           tsp->state = PROCESSING;
@@ -2956,33 +2981,24 @@
       for (int i = 0; i < new_runners; i++) {
         tslot_t *tsp = p->suspend_list_head;
         assert(tsp);
-        p->resume_list[resume_list_count++] = tsp;
+        resume_list2[resume_list_count++] = tsp;
         LL_REMOVE(p, suspend_list, tsp);
         p->suspend_list_count--;
         tsp->state = PROCESSING;
       }
-
-      p->poller = NULL;
-      // New poller may run concurrently.  Touch only this thread's stack for rest of block.
-
-      if (resume_list_count) {
-        unlock(&p->sched_mutex);
-        for (int i = 0; i < resume_list_count; i++) {
-          resume(p, p->resume_list[i]);
-        }
-        lock(&p->sched_mutex);
-      }
-    } else if (!can_block) {
-      ts->state = UNUSED;
-      unlock(&p->sched_mutex);
-      return NULL;
-    } else {
-      // TODO: loop while !poller_suspended, since new work coming
-      suspend(p, ts);
     }
-  } // while
-}
+  }
+  p->poller = NULL;
 
+  if (resume_list_count) {
+    // Allows a new poller to run concurrently.  Touch only stack vars.
+    unlock(&p->sched_mutex);
+    for (int i = 0; i < resume_list_count; i++) {
+      resume(p, resume_list2[i]);
+    }
+    lock(&p->sched_mutex);
+  }
+}
 
 pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   return proactor_do_epoll(p, true);
@@ -3042,12 +3058,12 @@
       if (wake(&p->context))
         notify = true;
 
+    unlock(&p->context.mutex);
     lock(&p->sched_mutex);
     tslot_t *ts = p->context.runner;
     if (unassign_thread(ts, UNUSED))
       notify = true;
     unlock(&p->sched_mutex);
-    unlock(&p->context.mutex);
 
     if (notify)
       wake_notify(&p->context);