PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 21226a9..0d63f90 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -352,7 +352,7 @@
 }
 
 bool schedule(task_t *tsk);
-void notify_poller(task_t *tsk);
+void notify_poller(pn_proactor_t *p);
 void schedule_done(task_t *tsk);
 
 void psocket_init(psocket_t* ps, epoll_type_t type);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index ae0c37b..c4f028d 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -314,8 +314,7 @@
 }
 
 // part2: unblock epoll_wait().  Make OS call without lock held.
-void notify_poller(task_t *tsk) {
-  pn_proactor_t *p = tsk->proactor;
+void notify_poller(pn_proactor_t *p) {
   if (p->eventfd == -1)
     return;
   rearm(p, &p->epoll_schedule);
@@ -706,7 +705,7 @@
     }
     else notify = schedule(&l->task);
     unlock(&l->task.mutex);
-    if (notify) notify_poller(&l->task);
+    if (notify) notify_poller(p);
     a = acceptor_list_next(&ovflw);
   }
 }
@@ -871,7 +870,7 @@
   }
   unlock(&pc->task.mutex);
   if (notify)
-    notify_poller(&pc->task);
+    notify_poller(pc->task.proactor);
 }
 
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
@@ -1018,7 +1017,7 @@
       notify = unassign_thread(ts, UNUSED);
       unlock(&p->sched_mutex);
       if (notify)
-        notify_poller(&p->task);
+        notify_poller(p);
       return;
     }
   }
@@ -1033,7 +1032,7 @@
   if (unassign_thread(ts, UNUSED))
     notify = true;
   unlock(&p->sched_mutex);
-  if (notify) notify_poller(&p->task);
+  if (notify) notify_poller(p);
   return;
 }
 
@@ -1399,7 +1398,6 @@
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
   bool notify = false;
-  bool notify_proactor = false;
 
   if (pc->disconnected) {
     notify = schedule(&pc->task);    /* Error during initialization */
@@ -1414,14 +1412,13 @@
       psocket_gai_error(&pc->psocket, gai_error, "connect to ");
       notify = schedule(&pc->task);
       lock(&p->task.mutex);
-      notify_proactor = schedule_if_inactive(p);
+      notify |= schedule_if_inactive(p);
       unlock(&p->task.mutex);
     }
   }
   /* We need to issue INACTIVE on immediate failure */
   unlock(&pc->task.mutex);
-  if (notify) notify_poller(&pc->task);
-  if (notify_proactor) notify_poller(&p->task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 static void pconnection_tick(pconnection_t *pc) {
@@ -1449,7 +1446,7 @@
     }
     unlock(&pc->task.mutex);
   }
-  if (notify) notify_poller(&pc->task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 void pn_proactor_release_connection(pn_connection_t *c) {
@@ -1463,7 +1460,7 @@
     notify = schedule(&pc->task);
     unlock(&pc->task.mutex);
   }
-  if (notify) notify_poller(&pc->task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 // ========================================================================
@@ -1576,7 +1573,7 @@
   }
   proactor_add(&l->task);
   unlock(&l->task.mutex);
-  if (notify) notify_poller(&l->task);
+  if (notify) notify_poller(p);
   return;
 }
 
@@ -1656,7 +1653,7 @@
     notify = schedule(&l->task);
   }
   unlock(&l->task.mutex);
-  if (notify) notify_poller(&l->task);
+  if (notify) notify_poller(l->task.proactor);
 }
 
 static void listener_forced_shutdown(pn_listener_t *l) {
@@ -1814,7 +1811,7 @@
     notify = unassign_thread(ts, UNUSED);
     unlock(&p->sched_mutex);
     if (notify)
-      notify_poller(&p->task);
+      notify_poller(p);
     return;
   } else if (n_events || listener_has_event(l))
     notify = schedule(&l->task);
@@ -1824,7 +1821,7 @@
   if (unassign_thread(ts, UNUSED))
     notify = true;
   unlock(&p->sched_mutex);
-  if (notify) notify_poller(&l->task);
+  if (notify) notify_poller(p);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -1885,7 +1882,7 @@
     notify = schedule(&l->task);
   unlock(&pc->task.mutex);
   unlock(&l->task.mutex);
-  if (notify) notify_poller(&l->task);
+  if (notify) notify_poller(l->task.proactor);
 }
 
 
@@ -2154,7 +2151,7 @@
   }
   bool notify = schedule_if_inactive(p);
   unlock(&p->task.mutex);
-  if (notify) notify_poller(&p->task);
+  if (notify) notify_poller(p);
   return can_free;
 }
 
@@ -2432,7 +2429,7 @@
       bool notify = unassign_thread(ts, PROCESSING);
       if (notify) {
         unlock(&p->sched_mutex);
-        notify_poller(&p->task);
+        notify_poller(p);
         lock(&p->sched_mutex);
       }
       continue;  // Long time may have passed.  Back to beginning.
@@ -2725,7 +2722,7 @@
     unlock(&p->sched_mutex);
 
     if (notify)
-      notify_poller(&p->task);
+      notify_poller(p);
     check_earmark_override(p, ts);
     return;
   }
@@ -2751,7 +2748,7 @@
     pni_timer_set(p->timer, t + pn_proactor_now_64());
   }
   unlock(&p->task.mutex);
-  if (notify) notify_poller(&p->task);
+  if (notify) notify_poller(p);
 }
 
 void pn_proactor_cancel_timeout(pn_proactor_t *p) {
@@ -2761,7 +2758,7 @@
   pni_timer_set(p->timer, 0);
   bool notify = schedule_if_inactive(p);
   unlock(&p->task.mutex);
-  if (notify) notify_poller(&p->task);
+  if (notify) notify_poller(p);
 }
 
 void pni_proactor_timeout(pn_proactor_t *p) {
@@ -2772,7 +2769,7 @@
     notify = schedule(&p->task);
   }
   unlock(&p->task.mutex);
-  if (notify) notify_poller(&p->task);
+  if (notify) notify_poller(p);
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -2799,7 +2796,7 @@
   notify = schedule_if_inactive(p);
   unlock(&p->task.mutex);
   if (!disconnecting_tasks) {
-    if (notify) notify_poller(&p->task);
+    if (notify) notify_poller(p);
     return;
   }
 
@@ -2859,7 +2856,7 @@
       if (tsk_notify)
         tsk_notify = schedule(tsk);
       if (tsk_notify)
-        notify_poller(tsk);
+        notify_poller(p);
     }
     unlock(&p->task.mutex);
     unlock(tsk_mutex);
@@ -2871,7 +2868,7 @@
     }
   }
   if (notify)
-    notify_poller(&p->task);
+    notify_poller(p);
 }
 
 const pn_netaddr_t *pn_transport_local_addr(pn_transport_t *t) {
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 0195c57..3fd2b36 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -185,7 +185,6 @@
   proactor_add(&prc->task);
 
   bool notify = false;
-  bool notify_proactor = false;
 
   const char *host;
   const char *port;
@@ -203,14 +202,13 @@
     prc->disconnected = true;
     notify = schedule(&prc->task);
     lock(&p->task.mutex);
-    notify_proactor = schedule_if_inactive(p);
+    notify |= schedule_if_inactive(p);
     unlock(&p->task.mutex);
   }
 
   /* We need to issue INACTIVE on immediate failure */
   unlock(&prc->task.mutex);
-  if (notify) notify_poller(&prc->task);
-  if (notify_proactor) notify_poller(&p->task);
+  if (notify) notify_poller(p);
 }
 
 void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {
@@ -250,7 +248,7 @@
   }
   unlock(&prc->task.mutex);
   unlock(&l->task.mutex);
-  if (notify) notify_poller(&l->task);
+  if (notify) notify_poller(l->task.proactor);
 }
 
 const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) {
@@ -274,7 +272,7 @@
     notify = schedule(&prc->task);
   }
   unlock(&prc->task.mutex);
-  if (notify) notify_poller(&prc->task);
+  if (notify) notify_poller(prc->task.proactor);
 }
 
 void pn_raw_connection_close(pn_raw_connection_t *rc) {
@@ -360,18 +358,17 @@
 }
 
 void pni_raw_connection_done(praw_connection_t *rc) {
-  bool self_notify = false;
+  bool notify = false;
   bool ready = false;
   lock(&rc->task.mutex);
   pn_proactor_t *p = rc->task.proactor;
   tslot_t *ts = rc->task.runner;
   rc->task.working = false;
-  self_notify = rc->waking && schedule(&rc->task);
+  notify = rc->waking && schedule(&rc->task);
   // The task may be in the ready state even if we've got no raw connection
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
   ready = rc->task.ready;
   unlock(&rc->task.mutex);
-  if (self_notify) notify_poller(&rc->task);
 
   pn_raw_connection_t *raw = &rc->raw_connection;
   int wanted =
@@ -390,7 +387,7 @@
   }
 
   lock(&p->sched_mutex);
-  bool notify = unassign_thread(ts, UNUSED);
+  notify |= unassign_thread(ts, UNUSED);
   unlock(&p->sched_mutex);
-  if (notify) notify_poller(&p->task);
+  if (notify) notify_poller(p);
 }
diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c
index a882128..26de7d2 100644
--- a/c/src/proactor/epoll_timer.c
+++ b/c/src/proactor/epoll_timer.c
@@ -282,7 +282,7 @@
   unlock(&tm->task.mutex);
 
   if (notify)
-    notify_poller(&tm->task);
+    notify_poller(tm->task.proactor);
 }
 
 pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready) {
@@ -353,7 +353,7 @@
   unlock(&tm->task.mutex);
 
   if (notify)
-    notify_poller(&tm->task);
+    notify_poller(tm->task.proactor);
   // The timer_manager never has events to batch.
   return NULL;
   // TODO: perhaps become task of one of the timed out timers (if otherwise idle) and process() that task.