PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner.  This closes #290
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 0d63f90..e8c9f0a 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -86,8 +86,9 @@
   pn_proactor_t *proactor;  /* Immutable */
   task_type_t type;
   bool working;
-  bool on_ready_list;
   bool ready;                // ready to run and on ready list.  Poller notified by eventfd.
+  bool waking;
+  bool on_ready_list;        // todo: protected by eventfd_mutex or sched mutex?  needed?
   struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
@@ -223,7 +224,6 @@
   pni_timer_t *timer;
   const char *host, *port;
   uint32_t new_events;
-  int wake_count;   // TODO: protected by task.mutex so should be moved in there (also really bool)
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
@@ -382,6 +382,28 @@
 void pni_pconnection_timeout(pconnection_t *pc);
 void pni_proactor_timeout(pn_proactor_t *p);
 
+// Generic wake primitives for a task.
+
+// Call with task lock held.  Must call notify_poller() if returns true.
+static inline bool pni_task_wake(task_t *tsk) {
+  if (!tsk->waking) {
+    tsk->waking = true;
+    return schedule(tsk);
+  }
+  return false;
+}
+
+// Call with task lock held.
+static inline bool pni_task_wake_pending(task_t *tsk) {
+  return tsk->waking;
+}
+
+// Call with task lock held and only from the running task.
+static inline void pni_task_wake_done(task_t *tsk) {
+  tsk->waking = false;
+}
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index c4f028d..7467683 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -49,6 +49,7 @@
 
  TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
  transitions from "private to the scheduler" to "visible to the task".
+ TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
  */
 
 
@@ -742,7 +743,6 @@
   psocket_init(&pc->psocket, PCONNECTION_IO);
   pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port);
   pc->new_events = 0;
-  pc->wake_count = 0;
   pc->tick_pending = false;
   pc->queued_disconnect = false;
   pc->disconnect_condition = NULL;
@@ -980,7 +980,7 @@
 
 /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
+  if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
@@ -1153,9 +1153,9 @@
     return &pc->batch;
   }
   bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc);
-  if (pc->wake_count) {
+  if (pni_task_wake_pending(&pc->task)) {
     waking = !closed;
-    pc->wake_count = 0;
+    pni_task_wake_done(&pc->task);
   }
   if (pc->tick_pending) {
     pc->tick_pending = false;
@@ -1441,8 +1441,7 @@
   if (pc) {
     lock(&pc->task.mutex);
     if (!pc->task.closing) {
-      pc->wake_count++;
-      notify = schedule(&pc->task);
+      notify = pni_task_wake(&pc->task);
     }
     unlock(&pc->task.mutex);
   }
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 3fd2b36..8722ff0 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,7 +50,6 @@
   struct addrinfo *ai;               /* Current connect address */
   bool connected;
   bool disconnected;
-  bool waking; // TODO: This is actually protected by task.mutex so should be moved into task (pconnection too)
 };
 
 static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -144,7 +143,6 @@
 
   prc->connected = false;
   prc->disconnected = false;
-  prc->waking = false;
   prc->batch.next_event = pni_raw_batch_next;
 
   pmutex_init(&prc->rearm_mutex);
@@ -268,8 +266,7 @@
   praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
   lock(&prc->task.mutex);
   if (!prc->task.closing) {
-    prc->waking = true;
-    notify = schedule(&prc->task);
+    notify = pni_task_wake(&prc->task);
   }
   unlock(&prc->task.mutex);
   if (notify) notify_poller(prc->task.proactor);
@@ -290,8 +287,10 @@
   // Check wake status every event processed
   bool waking = false;
   lock(&rc->task.mutex);
-  waking = rc->waking;
-  rc->waking = false;
+  if (pni_task_wake_pending(&rc->task)) {
+    waking = true;
+    pni_task_wake_done(&rc->task);
+  }
   unlock(&rc->task.mutex);
   if (waking) pni_raw_wake(raw);
 
@@ -346,8 +345,10 @@
   t->working = true;
   if (sched_ready) {
     schedule_done(t);
-    wake = rc->waking;
-    rc->waking = false;
+    if (pni_task_wake_pending(&rc->task)) {
+      wake = true;
+      pni_task_wake_done(&rc->task);
+    }
   }
   unlock(&t->mutex);
 
@@ -364,7 +365,7 @@
   pn_proactor_t *p = rc->task.proactor;
   tslot_t *ts = rc->task.runner;
   rc->task.working = false;
-  notify = rc->waking && schedule(&rc->task);
+  notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task);
   // The task may be in the ready state even if we've got no raw connection
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
   ready = rc->task.ready;