NO-JIRA: Split out epoll proactor poller logic to separate routine for readability
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 01d9db8..4c00d47 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -101,7 +101,7 @@
 //   Maybe futex is even better?
 // See other "TODO" in code.
 //
-// Consider case of large number of wakes: proactor_do_epoll() could start by
+// Consider case of large number of wakes: next_event_batch() could start by
 // looking for pending wakes before a kernel call to epoll_wait(), or there
 // could be several eventfds with random assignment of wakeables.
 
@@ -691,6 +691,7 @@
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
+static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block);
 static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
 
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
@@ -2580,7 +2581,7 @@
   return NULL;
 }
 
-static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
+static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) {
   lock(&p->tslot_mutex);
   tslot_t * ts = find_tslot(p);
   unlock(&p->tslot_mutex);
@@ -2591,9 +2592,9 @@
   assert(ts->state == UNUSED || ts->state == NEW);
   ts->state = PROCESSING;
 
+  // Process outstanding epoll events until we get a batch or need to block.
   while (true) {
-    // Process outstanding epoll events until we get a batch or need to block.
-
+    // First see if there are any contexts waiting to run and perhaps generate new Proton events,
     pcontext_t *ctx = next_runnable(p, ts);
     if (ctx) {
       ts->state = BATCHING;
@@ -2611,146 +2612,22 @@
       continue;  // Long time may have passed.  Back to beginning.
     }
 
-    // poll or wait for a runnable context
+    // Poll or wait for a runnable context
     if (p->poller == NULL) {
+      bool return_immediately;
       p->poller = ts;
-      // As poller with lots to do, be mindful of hogging the sched lock.  Release when making kernel calls.
-      assert(p->n_runnables == 0);
-      if (p->thread_count > p->thread_capacity)
-        grow_poller_bufs(p);
-      p->next_runnable = 0;
-      p->n_warm_runnables = 0;
-      p->last_earmark = NULL;
-
-      bool unfinished_earmarks = p->earmark_count > 0;
-      bool new_wakes = false;
-      bool epoll_immediate = unfinished_earmarks || !can_block;
-      assert(!p->sched_wake_first);
-      if (!epoll_immediate) {
-        lock(&p->eventfd_mutex);
-        if (p->wake_list_first) {
-          epoll_immediate = true;
-          new_wakes = true;
-        } else {
-          p->wakes_in_progress = false;
-        }
-        unlock(&p->eventfd_mutex);
+      // Get new epoll events (if any) and mark the relevant contexts as runnable
+      return_immediately = poller_do_epoll(p, ts, can_block);
+      p->poller = NULL;
+      if (return_immediately) {
+        // Check if another thread is available to continue epoll-ing.
+        tslot_t *res_ts = resume_one_thread(p);
+        ts->state = UNUSED;
+        unlock(&p->sched_mutex);
+        if (res_ts) resume(p, res_ts);
+        return NULL;
       }
-      int timeout = (epoll_immediate) ? 0 : -1;
-      p->poller_suspended = (timeout == -1);
-      unlock(&p->sched_mutex);
-
-      int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
-
-      lock(&p->sched_mutex);
-      p->poller_suspended = false;
-
-      bool unpolled_work = false;
-      if (p->earmark_count > 0) {
-        p->earmark_drain = true;
-        unpolled_work = true;
-      }
-      if (new_wakes) {
-        lock(&p->eventfd_mutex);
-        schedule_wake_list(p);
-        unlock(&p->eventfd_mutex);
-        unpolled_work = true;
-      }
-
-      if (n < 0) {
-        if (errno != EINTR)
-          perror("epoll_wait"); // TODO: proper log
-        if (!can_block && !unpolled_work) {
-          p->poller = NULL;
-          tslot_t *res_ts = resume_one_thread(p);
-          ts->state = UNUSED;
-          unlock(&p->sched_mutex);
-          if (res_ts) resume(p, res_ts);
-          return NULL;
-        }
-        else {
-          p->poller = NULL;
-          continue;
-        }
-      } else if (n == 0) {
-        if (!can_block && !unpolled_work) {
-          p->poller = NULL;
-          tslot_t *res_ts = resume_one_thread(p);
-          ts->state = UNUSED;
-          unlock(&p->sched_mutex);
-          if (res_ts) resume(p, res_ts);
-          return NULL;
-        }
-        else {
-          if (!epoll_immediate)
-            perror("epoll_wait unexpected timeout"); // TODO: proper log
-          if (!unpolled_work) {
-            p->poller = NULL;
-            continue;
-          }
-        }
-      }
-
-      for (int i = 0; i < n; i++) {
-        ctx = post_event(p, &p->kevents[i]);
-        if (ctx)
-          make_runnable(ctx);
-      }
-      if (n > 0)
-        memset(p->kevents, 0, sizeof(struct epoll_event) * n);
-
-      // The list of pending wakes can be very long.  Traverse part of it looking for warm pairings.
-      pcontext_t *wctx = p->sched_wake_current;
-      int max_runnables = p->runnables_capacity;
-      while (wctx && p->n_runnables < max_runnables) {
-        if (wctx->runner == REWAKE_PLACEHOLDER)
-          wctx->runner = NULL;  // Allow context to run again.
-        ctx = post_wake(p, wctx);
-        if (ctx)
-          make_runnable(ctx);
-        pop_wake(wctx);
-        wctx = wctx->wake_next;
-      }
-      p->sched_wake_current = wctx;
-      // More wakes than places on the runnables list
-      while (wctx) {
-        if (wctx->runner == REWAKE_PLACEHOLDER)
-          wctx->runner = NULL;  // Allow context to run again.
-        wctx->sched_wake = true;
-        wctx->sched_pending = true;
-        if (wctx->runnable || wctx->runner)
-          pop_wake(wctx);
-        wctx = wctx->wake_next;
-      }
-
-      if (pni_immediate && !ts->context) {
-        // Poller gets to run if possible
-        pcontext_t *pctx;
-        if (p->n_runnables) {
-          assert(p->next_runnable == 0);
-          pctx = p->runnables[0];
-          if (++p->next_runnable == p->n_runnables)
-            p->n_runnables = 0;
-        } else if (p->n_warm_runnables) {
-          pctx = p->warm_runnables[--p->n_warm_runnables];
-          tslot_t *ts2 = pctx->runner;
-          ts2->prev_context = ts2->context = NULL;
-          pctx->runner = NULL;
-        } else if (p->last_earmark) {
-          pctx = p->last_earmark->context;
-          remove_earmark(p->last_earmark);
-          if (p->earmark_count == 0)
-            p->earmark_drain = false;
-        } else {
-          pctx = NULL;
-        }
-        if (pctx) {
-          assign_thread(ts, pctx);
-        }
-      }
-
       poller_done(p, ts);  // put suspended threads to work.
-      // p->poller has been released, so a new poller may already be running.
     } else if (!can_block) {
       ts->state = UNUSED;
       unlock(&p->sched_mutex);
@@ -2762,6 +2639,139 @@
   } // while
 }
 
+// Call with sched lock.  Return true if !can_block and no new events to process.
+static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) {
+  // As poller with lots to do, be mindful of hogging the sched lock.  Release when making kernel calls.
+  int n_events;
+  pcontext_t *ctx;
+
+  while (true) {
+    assert(p->n_runnables == 0);
+    if (p->thread_count > p->thread_capacity)
+      grow_poller_bufs(p);
+    p->next_runnable = 0;
+    p->n_warm_runnables = 0;
+    p->last_earmark = NULL;
+
+    bool unfinished_earmarks = p->earmark_count > 0;
+    bool new_wakes = false;
+    bool epoll_immediate = unfinished_earmarks || !can_block;
+    assert(!p->sched_wake_first);
+    if (!epoll_immediate) {
+      lock(&p->eventfd_mutex);
+      if (p->wake_list_first) {
+        epoll_immediate = true;
+        new_wakes = true;
+      } else {
+        p->wakes_in_progress = false;
+      }
+      unlock(&p->eventfd_mutex);
+    }
+    int timeout = (epoll_immediate) ? 0 : -1;
+    p->poller_suspended = (timeout == -1);
+    unlock(&p->sched_mutex);
+
+    n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
+
+    lock(&p->sched_mutex);
+    p->poller_suspended = false;
+
+    bool unpolled_work = false;
+    if (p->earmark_count > 0) {
+      p->earmark_drain = true;
+      unpolled_work = true;
+    }
+    if (new_wakes) {
+      lock(&p->eventfd_mutex);
+      schedule_wake_list(p);
+      unlock(&p->eventfd_mutex);
+      unpolled_work = true;
+    }
+
+    if (n_events < 0) {
+      if (errno != EINTR)
+        perror("epoll_wait"); // TODO: proper log
+      if (!can_block && !unpolled_work)
+        return true;
+      else
+        continue;
+    } else if (n_events == 0) {
+      if (!can_block && !unpolled_work)
+        return true;
+      else {
+        if (!epoll_immediate)
+          perror("epoll_wait unexpected timeout"); // TODO: proper log
+        if (!unpolled_work)
+          continue;
+      }
+    }
+
+    break;
+  }
+
+  // We have unpolled work or at least one new epoll event
+
+
+  for (int i = 0; i < n_events; i++) {
+    ctx = post_event(p, &p->kevents[i]);
+    if (ctx)
+      make_runnable(ctx);
+  }
+  if (n_events > 0)
+    memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);
+
+  // The list of pending wakes can be very long.  Traverse part of it looking for warm pairings.
+  pcontext_t *wctx = p->sched_wake_current;
+  int max_runnables = p->runnables_capacity;
+  while (wctx && p->n_runnables < max_runnables) {
+    if (wctx->runner == REWAKE_PLACEHOLDER)
+      wctx->runner = NULL;  // Allow context to run again.
+    ctx = post_wake(p, wctx);
+    if (ctx)
+      make_runnable(ctx);
+    pop_wake(wctx);
+    wctx = wctx->wake_next;
+  }
+  p->sched_wake_current = wctx;
+  // More wakes than places on the runnables list
+  while (wctx) {
+    if (wctx->runner == REWAKE_PLACEHOLDER)
+      wctx->runner = NULL;  // Allow context to run again.
+    wctx->sched_wake = true;
+    wctx->sched_pending = true;
+    if (wctx->runnable || wctx->runner)
+      pop_wake(wctx);
+    wctx = wctx->wake_next;
+  }
+
+  if (pni_immediate && !ts->context) {
+    // Poller gets to run if possible
+    pcontext_t *pctx;
+    if (p->n_runnables) {
+      assert(p->next_runnable == 0);
+      pctx = p->runnables[0];
+      if (++p->next_runnable == p->n_runnables)
+        p->n_runnables = 0;
+    } else if (p->n_warm_runnables) {
+      pctx = p->warm_runnables[--p->n_warm_runnables];
+      tslot_t *ts2 = pctx->runner;
+      ts2->prev_context = ts2->context = NULL;
+      pctx->runner = NULL;
+    } else if (p->last_earmark) {
+      pctx = p->last_earmark->context;
+      remove_earmark(p->last_earmark);
+      if (p->earmark_count == 0)
+        p->earmark_drain = false;
+    } else {
+      pctx = NULL;
+    }
+    if (pctx) {
+      assign_thread(ts, pctx);
+    }
+  }
+  return false;
+}
+
 // Call with sched lock, but only from poller context.
 static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
   // Create a list of available threads to put to work.
@@ -2819,11 +2829,11 @@
 }
 
 pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
-  return proactor_do_epoll(p, true);
+  return next_event_batch(p, true);
 }
 
 pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
-  return proactor_do_epoll(p, false);
+  return next_event_batch(p, false);
 }
 
 // Call with no locks