PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index d0664d9..1283d91 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -58,6 +58,7 @@
 #undef _GNU_SOURCE
 
 #include "proactor-internal.h"
+#include "core/engine-internal.h"
 #include "core/logger_private.h"
 #include "core/util.h"
 
@@ -293,13 +294,6 @@
   pmutex_finalize(&pt->mutex);
 }
 
-pn_timestamp_t pn_i_now2(void)
-{
-  struct timespec now;
-  clock_gettime(CLOCK_REALTIME, &now);
-  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
-}
-
 
 // ========================================================================
 // Proactor common code
@@ -542,7 +536,7 @@
   int hog_count; // thread hogging limiter
   pn_event_batch_t batch;
   pn_connection_driver_t driver;
-  bool wbuf_valid;
+  bool output_drained;
   const char *wbuf_current;
   size_t wbuf_remaining;
   size_t wbuf_completed;
@@ -555,7 +549,7 @@
 } pconnection_t;
 
 /*
- * A listener can have mutiple sockets (as specified in the addrinfo).  They
+ * A listener can have multiple sockets (as specified in the addrinfo).  They
  * are armed separately.  The individual psockets can be part of at most one
  * list: the global proactor overflow retry list or the per-listener list of
  * pending accepts (valid inbound socket obtained, but pn_listener_accept not
@@ -1128,7 +1122,7 @@
   pc->read_blocked = true;
   pc->write_blocked = true;
   pc->disconnected = false;
-  pc->wbuf_valid = false;
+  pc->output_drained = false;
   pc->wbuf_completed = 0;
   pc->wbuf_remaining = 0;
   pc->wbuf_current = NULL;
@@ -1195,26 +1189,19 @@
   // else proactor_disconnect logic owns psocket and its final free
 }
 
-static void invalidate_wbuf(pconnection_t *pc) {
-  if (pc->wbuf_valid) {
-    if (pc->wbuf_completed)
-      pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed);
-    pc->wbuf_completed = 0;
-    pc->wbuf_remaining = 0;
-    pc->wbuf_valid = false;
-  }
+static void set_wbuf(pconnection_t *pc, const char *start, size_t sz) {
+  pc->wbuf_completed = 0;
+  pc->wbuf_current = start;
+  pc->wbuf_remaining = sz;
 }
 
 // Never call with any locks held.
 static void ensure_wbuf(pconnection_t *pc) {
-  if (!pc->wbuf_valid) {
-    // next connection_driver call is the expensive output generator
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-    pc->wbuf_completed = 0;
-    pc->wbuf_remaining = wbuf.size;
-    pc->wbuf_current = wbuf.start;
-    pc->wbuf_valid = true;
-  }
+  // next connection_driver call is the expensive output generator
+  pn_bytes_t bytes = pn_connection_driver_write_buffer(&pc->driver);
+  set_wbuf(pc, bytes.start, bytes.size);
+  if (bytes.size == 0)
+    pc->output_drained = true;
 }
 
 // Call with lock held or from forced_shutdown
@@ -1260,9 +1247,8 @@
     lock(&p->sched_mutex);
     idle_threads = (p->suspend_list_head != NULL);
     unlock(&p->sched_mutex);
-    if (idle_threads) {
+    if (idle_threads && !pc->write_blocked && !pc->read_blocked) {
       write_flush(pc);  // May generate transport event
-      pc->read_blocked = pc->write_blocked = false;
       pconnection_process(pc, 0, false, false, true);
       e = pn_connection_driver_next_event(&pc->driver);
     }
@@ -1275,7 +1261,7 @@
       }
     }
   }
-  if (e) invalidate_wbuf(pc);
+  if (e) pc->output_drained = false;
 
   return e;
 }
@@ -1299,7 +1285,6 @@
    close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
 */
 static bool pconnection_rearm_check(pconnection_t *pc) {
-  assert(pc->wbuf_valid);
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
     return false;
   }
@@ -1352,7 +1337,6 @@
 
 /* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  assert(pc->wbuf_valid);
   if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
@@ -1409,19 +1393,26 @@
 }
 
 // Return true unless error
- static bool pconnection_write(pconnection_t *pc) {
+static bool pconnection_write(pconnection_t *pc) {
   size_t wbuf_size = pc->wbuf_remaining;
   ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, MSG_NOSIGNAL);
   if (n > 0) {
     pc->wbuf_completed += n;
     pc->wbuf_remaining -= n;
     pc->io_doublecheck = false;
-    if (pc->wbuf_remaining)
+    if (pc->wbuf_remaining) {
       pc->write_blocked = true;
+      pc->wbuf_current += n;
+    }
     else {
-      // No need to aggregate multiple writes
+      // write_done also calls pn_transport_pending(), so the transport knows all current output
       pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed);
       pc->wbuf_completed = 0;
+      pn_transport_t *t = pc->driver.transport;
+      set_wbuf(pc, t->output_buf, t->output_pending);
+      if (t->output_pending == 0)
+        pc->output_drained = true;
+      // TODO: revise transport API to allow similar efficient access to transport output
     }
   } else if (errno == EWOULDBLOCK) {
     pc->write_blocked = true;
@@ -1433,12 +1424,29 @@
 
 // Never call with any locks held.
 static void write_flush(pconnection_t *pc) {
-  ensure_wbuf(pc);
-  if (!pc->write_blocked && !pconnection_wclosed(pc)) {
+  size_t prev_wbuf_remaining = 0;
+
+  while(!pc->write_blocked && !pc->output_drained && !pconnection_wclosed(pc)) {
+    if (pc->wbuf_remaining == 0) {
+      ensure_wbuf(pc);
+      if (pc->wbuf_remaining == 0)
+        pc->output_drained = true;
+    } else {
+      // Check if we are doing multiple small writes in a row, possibly worth growing the transport output buffer.
+      if (prev_wbuf_remaining
+          && prev_wbuf_remaining == pc->wbuf_remaining         // two max outputs in a row
+          && pc->wbuf_remaining < 131072) {
+        ensure_wbuf(pc);  // second call -> unchanged wbuf or transport buffer size doubles and more bytes added
+        prev_wbuf_remaining = 0;
+      } else {
+        prev_wbuf_remaining = pc->wbuf_remaining;
+      }
+    }
     if (pc->wbuf_remaining > 0) {
       if (!pconnection_write(pc)) {
         psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to");
       }
+      // pconnection_write side effect: wbuf may be replenished, and if not, output_drained may be set.
     }
     else {
       if (pn_connection_driver_write_closed(&pc->driver)) {
@@ -1458,6 +1466,7 @@
   bool timer_fired = false;
   bool waking = false;
   bool tick_required = false;
+  bool immediate_write = false;
 
   // Don't touch data exclusive to working thread (yet).
   if (timeout) {
@@ -1533,8 +1542,11 @@
         pconnection_maybe_connect_lh(pc);
       else
         pconnection_connected_lh(pc); /* Non error event means we are connected */
-      if (update_events & EPOLLOUT)
+      if (update_events & EPOLLOUT) {
         pc->write_blocked = false;
+        if (pc->wbuf_remaining > 0)
+          immediate_write = true;
+      }
       if (update_events & EPOLLIN)
         pc->read_blocked = false;
     }
@@ -1555,8 +1567,10 @@
     waking = false;
   }
 
-  // read... tick... write
-  // perhaps should be: write_if_recent_EPOLLOUT... read... tick... write
+  if (immediate_write) {
+    immediate_write = false;
+    write_flush(pc);
+  }
 
   if (!pconnection_rclosed(pc)) {
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
@@ -1564,7 +1578,7 @@
       ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
       if (n > 0) {
         pn_connection_driver_read_done(&pc->driver, n);
-        invalidate_wbuf(pc);
+        pc->output_drained = false;
         pconnection_tick(pc);         /* check for tick changes. */
         tick_required = false;
         pc->io_doublecheck = false;
@@ -1585,7 +1599,7 @@
   if (tick_required) {
     pconnection_tick(pc);         /* check for tick changes. */
     tick_required = false;
-    invalidate_wbuf(pc);
+    pc->output_drained = false;
   }
 
   if (topup) {
@@ -1594,7 +1608,7 @@
   }
 
   if (pconnection_has_event(pc)) {
-    invalidate_wbuf(pc);
+    pc->output_drained = false;
     return &pc->batch;
   }
 
@@ -3010,32 +3024,7 @@
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     bool notify = false;
-    bool rearm_interrupt = false;
     lock(&p->context.mutex);
-    lock(&p->sched_mutex);
-
-    bool timeout = p->sched_timeout;
-    if (timeout) p->sched_timeout = false;
-    bool intr = p->sched_interrupt;
-    if (intr) {
-      p->sched_interrupt = false;
-      rearm_interrupt = true;
-      p->need_interrupt = true;
-    }
-    if (p->context.sched_wake) {
-      p->context.sched_wake = false;
-      wake_done(&p->context);
-    }
-
-    // ptimer_callback is slow.  Revisit timer cancel code in light of change to single poller thread.
-    bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
-    if (timeout) {
-      p->timer_armed = false;
-      if (timer_fired && p->timeout_set) {
-        p->need_timeout = true;
-      }
-    }
-
     bool rearm_timer = !p->timer_armed && !p->shutting_down;
     p->timer_armed = true;
     p->context.working = false;
@@ -3048,19 +3037,18 @@
     if (proactor_has_event(p))
       if (wake(&p->context))
         notify = true;
+
+    lock(&p->sched_mutex);
     tslot_t *ts = p->context.runner;
     if (unassign_thread(ts, UNUSED))
       notify = true;
     unlock(&p->sched_mutex);
     unlock(&p->context.mutex);
+
     if (notify)
       wake_notify(&p->context);
     if (rearm_timer)
       rearm(p, &p->timer.epoll_io);
-    if (rearm_interrupt) {
-      (void)read_uint64(p->interruptfd);
-      rearm(p, &p->epoll_interrupt);
-    }
     check_earmark_override(p, ts);
     return;
   }