PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index d0664d9..1283d91 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -58,6 +58,7 @@
#undef _GNU_SOURCE
#include "proactor-internal.h"
+#include "core/engine-internal.h"
#include "core/logger_private.h"
#include "core/util.h"
@@ -293,13 +294,6 @@
pmutex_finalize(&pt->mutex);
}
-pn_timestamp_t pn_i_now2(void)
-{
- struct timespec now;
- clock_gettime(CLOCK_REALTIME, &now);
- return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
-}
-
// ========================================================================
// Proactor common code
@@ -542,7 +536,7 @@
int hog_count; // thread hogging limiter
pn_event_batch_t batch;
pn_connection_driver_t driver;
- bool wbuf_valid;
+ bool output_drained;
const char *wbuf_current;
size_t wbuf_remaining;
size_t wbuf_completed;
@@ -555,7 +549,7 @@
} pconnection_t;
/*
- * A listener can have mutiple sockets (as specified in the addrinfo). They
+ * A listener can have multiple sockets (as specified in the addrinfo). They
* are armed separately. The individual psockets can be part of at most one
* list: the global proactor overflow retry list or the per-listener list of
* pending accepts (valid inbound socket obtained, but pn_listener_accept not
@@ -1128,7 +1122,7 @@
pc->read_blocked = true;
pc->write_blocked = true;
pc->disconnected = false;
- pc->wbuf_valid = false;
+ pc->output_drained = false;
pc->wbuf_completed = 0;
pc->wbuf_remaining = 0;
pc->wbuf_current = NULL;
@@ -1195,26 +1189,19 @@
// else proactor_disconnect logic owns psocket and its final free
}
-static void invalidate_wbuf(pconnection_t *pc) {
- if (pc->wbuf_valid) {
- if (pc->wbuf_completed)
- pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed);
- pc->wbuf_completed = 0;
- pc->wbuf_remaining = 0;
- pc->wbuf_valid = false;
- }
+static void set_wbuf(pconnection_t *pc, const char *start, size_t sz) {
+ pc->wbuf_completed = 0;
+ pc->wbuf_current = start;
+ pc->wbuf_remaining = sz;
}
// Never call with any locks held.
static void ensure_wbuf(pconnection_t *pc) {
- if (!pc->wbuf_valid) {
- // next connection_driver call is the expensive output generator
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
- pc->wbuf_completed = 0;
- pc->wbuf_remaining = wbuf.size;
- pc->wbuf_current = wbuf.start;
- pc->wbuf_valid = true;
- }
+ // next connection_driver call is the expensive output generator
+ pn_bytes_t bytes = pn_connection_driver_write_buffer(&pc->driver);
+ set_wbuf(pc, bytes.start, bytes.size);
+ if (bytes.size == 0)
+ pc->output_drained = true;
}
// Call with lock held or from forced_shutdown
@@ -1260,9 +1247,8 @@
lock(&p->sched_mutex);
idle_threads = (p->suspend_list_head != NULL);
unlock(&p->sched_mutex);
- if (idle_threads) {
+ if (idle_threads && !pc->write_blocked && !pc->read_blocked) {
write_flush(pc); // May generate transport event
- pc->read_blocked = pc->write_blocked = false;
pconnection_process(pc, 0, false, false, true);
e = pn_connection_driver_next_event(&pc->driver);
}
@@ -1275,7 +1261,7 @@
}
}
}
- if (e) invalidate_wbuf(pc);
+ if (e) pc->output_drained = false;
return e;
}
@@ -1299,7 +1285,6 @@
close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic.
*/
static bool pconnection_rearm_check(pconnection_t *pc) {
- assert(pc->wbuf_valid);
if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
return false;
}
@@ -1352,7 +1337,6 @@
/* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */
static inline bool pconnection_work_pending(pconnection_t *pc) {
- assert(pc->wbuf_valid);
if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
return true;
if (!pc->read_blocked && !pconnection_rclosed(pc))
@@ -1409,19 +1393,26 @@
}
// Return true unless error
- static bool pconnection_write(pconnection_t *pc) {
+static bool pconnection_write(pconnection_t *pc) {
size_t wbuf_size = pc->wbuf_remaining;
ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, MSG_NOSIGNAL);
if (n > 0) {
pc->wbuf_completed += n;
pc->wbuf_remaining -= n;
pc->io_doublecheck = false;
- if (pc->wbuf_remaining)
+ if (pc->wbuf_remaining) {
pc->write_blocked = true;
+ pc->wbuf_current += n;
+ }
else {
- // No need to aggregate multiple writes
+ // write_done also calls pn_transport_pending(), so the transport knows all current output
pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed);
pc->wbuf_completed = 0;
+ pn_transport_t *t = pc->driver.transport;
+ set_wbuf(pc, t->output_buf, t->output_pending);
+ if (t->output_pending == 0)
+ pc->output_drained = true;
+ // TODO: revise transport API to allow similar efficient access to transport output
}
} else if (errno == EWOULDBLOCK) {
pc->write_blocked = true;
@@ -1433,12 +1424,29 @@
// Never call with any locks held.
static void write_flush(pconnection_t *pc) {
- ensure_wbuf(pc);
- if (!pc->write_blocked && !pconnection_wclosed(pc)) {
+ size_t prev_wbuf_remaining = 0;
+
+ while(!pc->write_blocked && !pc->output_drained && !pconnection_wclosed(pc)) {
+ if (pc->wbuf_remaining == 0) {
+ ensure_wbuf(pc);
+ if (pc->wbuf_remaining == 0)
+ pc->output_drained = true;
+ } else {
+ // Check if we are doing multiple small writes in a row, possibly worth growing the transport output buffer.
+ if (prev_wbuf_remaining
+ && prev_wbuf_remaining == pc->wbuf_remaining // two max outputs in a row
+ && pc->wbuf_remaining < 131072) {
+ ensure_wbuf(pc); // second call -> unchanged wbuf or transport buffer size doubles and more bytes added
+ prev_wbuf_remaining = 0;
+ } else {
+ prev_wbuf_remaining = pc->wbuf_remaining;
+ }
+ }
if (pc->wbuf_remaining > 0) {
if (!pconnection_write(pc)) {
psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to");
}
+ // pconnection_write side effect: wbuf may be replenished, and if not, output_drained may be set.
}
else {
if (pn_connection_driver_write_closed(&pc->driver)) {
@@ -1458,6 +1466,7 @@
bool timer_fired = false;
bool waking = false;
bool tick_required = false;
+ bool immediate_write = false;
// Don't touch data exclusive to working thread (yet).
if (timeout) {
@@ -1533,8 +1542,11 @@
pconnection_maybe_connect_lh(pc);
else
pconnection_connected_lh(pc); /* Non error event means we are connected */
- if (update_events & EPOLLOUT)
+ if (update_events & EPOLLOUT) {
pc->write_blocked = false;
+ if (pc->wbuf_remaining > 0)
+ immediate_write = true;
+ }
if (update_events & EPOLLIN)
pc->read_blocked = false;
}
@@ -1555,8 +1567,10 @@
waking = false;
}
- // read... tick... write
- // perhaps should be: write_if_recent_EPOLLOUT... read... tick... write
+ if (immediate_write) {
+ immediate_write = false;
+ write_flush(pc);
+ }
if (!pconnection_rclosed(pc)) {
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
@@ -1564,7 +1578,7 @@
ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
if (n > 0) {
pn_connection_driver_read_done(&pc->driver, n);
- invalidate_wbuf(pc);
+ pc->output_drained = false;
pconnection_tick(pc); /* check for tick changes. */
tick_required = false;
pc->io_doublecheck = false;
@@ -1585,7 +1599,7 @@
if (tick_required) {
pconnection_tick(pc); /* check for tick changes. */
tick_required = false;
- invalidate_wbuf(pc);
+ pc->output_drained = false;
}
if (topup) {
@@ -1594,7 +1608,7 @@
}
if (pconnection_has_event(pc)) {
- invalidate_wbuf(pc);
+ pc->output_drained = false;
return &pc->batch;
}
@@ -3010,32 +3024,7 @@
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
bool notify = false;
- bool rearm_interrupt = false;
lock(&p->context.mutex);
- lock(&p->sched_mutex);
-
- bool timeout = p->sched_timeout;
- if (timeout) p->sched_timeout = false;
- bool intr = p->sched_interrupt;
- if (intr) {
- p->sched_interrupt = false;
- rearm_interrupt = true;
- p->need_interrupt = true;
- }
- if (p->context.sched_wake) {
- p->context.sched_wake = false;
- wake_done(&p->context);
- }
-
- // ptimer_callback is slow. Revisit timer cancel code in light of change to single poller thread.
- bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
- if (timeout) {
- p->timer_armed = false;
- if (timer_fired && p->timeout_set) {
- p->need_timeout = true;
- }
- }
-
bool rearm_timer = !p->timer_armed && !p->shutting_down;
p->timer_armed = true;
p->context.working = false;
@@ -3048,19 +3037,18 @@
if (proactor_has_event(p))
if (wake(&p->context))
notify = true;
+
+ lock(&p->sched_mutex);
tslot_t *ts = p->context.runner;
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
unlock(&p->context.mutex);
+
if (notify)
wake_notify(&p->context);
if (rearm_timer)
rearm(p, &p->timer.epoll_io);
- if (rearm_interrupt) {
- (void)read_uint64(p->interruptfd);
- rearm(p, &p->epoll_interrupt);
- }
check_earmark_override(p, ts);
return;
}