PROTON-2338: Implement proactor raw connection half close operations
The raw connection state machine has been rejigged to encompass half closed
states. The previous separate read/write state booleans have been
combined into a single state. This should allow for clearer connection
state tracking.
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 8722ff0..88410e3 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -272,14 +272,33 @@
if (notify) notify_poller(prc->task.proactor);
}
-void pn_raw_connection_close(pn_raw_connection_t *rc) {
+static inline void set_closed(pn_raw_connection_t *rc)
+{
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
lock(&prc->task.mutex);
prc->task.closing = true;
unlock(&prc->task.mutex);
+}
+
+void pn_raw_connection_close(pn_raw_connection_t *rc) {
+ set_closed(rc);
pni_raw_close(rc);
}
+void pn_raw_connection_read_close(pn_raw_connection_t *rc) {
+ if (pn_raw_connection_is_write_closed(rc)) {
+ set_closed(rc);
+ }
+ pni_raw_read_close(rc);
+}
+
+void pn_raw_connection_write_close(pn_raw_connection_t *rc) {
+ if (pn_raw_connection_is_read_closed(rc)) {
+ set_closed(rc);
+ }
+ pni_raw_write_close(rc);
+}
+
static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
praw_connection_t *rc = containerof(batch, praw_connection_t, batch);
pn_raw_connection_t *raw = &rc->raw_connection;
@@ -318,6 +337,14 @@
return recv(fd, b, s, MSG_DONTWAIT);
}
+static int shutr(int fd) {
+ return shutdown(fd, SHUT_RD);
+}
+
+static int shutw(int fd) {
+ return shutdown(fd, SHUT_WR);
+}
+
static void set_error(pn_raw_connection_t *conn, const char *msg, int err) {
psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg);
}
@@ -372,6 +399,8 @@
unlock(&rc->task.mutex);
pn_raw_connection_t *raw = &rc->raw_connection;
+ int fd = rc->psocket.epoll_io.fd;
+ pni_raw_process_shutdown(raw, fd, shutr, shutw);
int wanted =
(pni_raw_can_read(raw) ? EPOLLIN : 0) |
(pni_raw_can_write(raw) ? EPOLLOUT : 0);
@@ -379,7 +408,7 @@
rc->psocket.epoll_io.wanted = wanted;
rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
} else {
- bool finished_disconnect = raw->rclosed && raw->wclosed && !ready && !raw->disconnectpending;
+ bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending;
if (finished_disconnect) {
// If we're closed and we've sent the disconnect then close
pni_raw_finalize(raw);
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 2e183a0..beda92d 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -39,6 +39,29 @@
buff_written = 6
} buff_type;
+/*
+ * r = read, w = write
+ * init = initial
+ * o = open - can read/write
+ * d = draining - write draining
+ * c = closing - shutdown pending
+ * s = stopped (closed)
+ */
+typedef enum {
+ conn_wrong = -1,
+ conn_init = 0,
+ conn_ro_wo = 1,
+ conn_ro_wd = 2,
+ conn_ro_wc = 3,
+ conn_ro_ws = 4,
+ conn_rc_wo = 5,
+ conn_rc_wd = 6,
+ conn_rs_wo = 7,
+ conn_rs_wd = 8,
+ conn_rs_ws = 9,
+ conn_fini = 10,
+} raw_conn_state;
+
typedef uint16_t buff_ptr; // This is always the index+1 so that 0 can be special
typedef struct pbuffer_t {
@@ -72,13 +95,13 @@
buff_ptr wbuffer_last_towrite;
buff_ptr wbuffer_first_written;
buff_ptr wbuffer_last_written;
- bool rneedbufferevent;
- bool wneedbufferevent;
+
+ uint8_t state; // really raw_conn_state
+ bool rrequestedbuffers;
+ bool wrequestedbuffers;
+
bool rpending;
bool wpending;
- bool rclosed;
- bool wdraining;
- bool wclosed;
bool rclosedpending;
bool wclosedpending;
bool rdrainpending;
@@ -95,8 +118,11 @@
void pni_raw_connect_failed(pn_raw_connection_t *conn);
void pni_raw_wake(pn_raw_connection_t *conn);
void pni_raw_close(pn_raw_connection_t *conn);
+void pni_raw_read_close(pn_raw_connection_t *conn);
+void pni_raw_write_close(pn_raw_connection_t *conn);
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
+void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int));
bool pni_raw_can_read(pn_raw_connection_t *conn);
bool pni_raw_can_write(pn_raw_connection_t *conn);
pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index c88f602..a819b5b 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -62,6 +62,142 @@
conn->rbuffer_first_empty = 1;
conn->wbuffer_first_empty = 1;
+
+ conn->state = conn_init;
+}
+
+typedef enum {
+ conn_connected = 0,
+ conn_read_closed = 1,
+ conn_write_closed = 2,
+ conn_write_drained = 3,
+ int_read_shutdown = 4,
+ int_write_shutdown = 5,
+ int_disconnect = 6,
+ api_read_close = 7,
+ api_write_close = 8,
+} raw_event;
+
+/*
+ * There's a little trick in this state table - both conn_init - the initial state and the self transition are represented by
+ * 0. This is because no state ever transitions to the initial state.
+ */
+const uint8_t state_transitions[][9] = {
+ /* State\event cconnected, crclosed, cwclosed, cwdrained, irshutdown, iwshutdown, idsconnect, arclose, awclose */
+ [conn_init] = {conn_ro_wo, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_rc_wo, conn_ro_wd},
+ [conn_ro_wo] = {conn_wrong, conn_rc_wo, conn_ro_wc, 0, conn_wrong, conn_wrong, conn_wrong, conn_rc_wo, conn_ro_wd},
+ [conn_ro_wd] = {conn_wrong, conn_rc_wd, conn_ro_wc, conn_ro_wc, conn_wrong, conn_wrong, conn_wrong, conn_rc_wd, 0},
+ [conn_ro_wc] = {conn_wrong, conn_rs_ws, 0, conn_wrong, conn_wrong, conn_ro_ws, conn_wrong, conn_rs_ws, 0},
+ [conn_ro_ws] = {conn_wrong, conn_rs_ws, 0, 0, conn_wrong, conn_wrong, conn_wrong, conn_rs_ws, 0},
+ [conn_rc_wo] = {conn_wrong, 0, conn_rs_ws, 0, conn_rs_wo, conn_wrong, conn_wrong, 0, conn_rc_wd},
+ [conn_rc_wd] = {conn_wrong, 0, conn_rs_ws, conn_rs_ws, conn_rs_wd, conn_wrong, conn_wrong, 0, 0},
+ [conn_rs_wo] = {conn_wrong, 0, conn_rs_ws, 0, conn_wrong, conn_wrong, conn_wrong, 0, conn_rs_wd},
+ [conn_rs_wd] = {conn_wrong, 0, conn_rs_ws, conn_rs_ws, conn_wrong, conn_wrong, conn_wrong, 0, 0},
+ [conn_rs_ws] = {conn_wrong, 0, 0, conn_wrong, conn_wrong, conn_wrong, conn_fini, 0, 0},
+ [conn_fini] = {conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, 0, 0, 0},
+};
+
+static inline uint8_t pni_raw_new_state(pn_raw_connection_t *conn, raw_event event) {
+ uint8_t old_state = conn->state;
+ uint8_t new_state = state_transitions[old_state][event];
+ assert(new_state != (uint8_t)conn_wrong);
+ return new_state == 0 ? old_state : new_state;
+}
+
+static inline bool pni_raw_ropen(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_ro_wo:
+ case conn_ro_wd:
+ case conn_ro_wc:
+ case conn_ro_ws:
+ return true;
+ default:
+ return false;
+ }
+}
+
+static inline bool pni_raw_rclosed(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_rc_wo:
+ case conn_rc_wd:
+ case conn_rs_wo:
+ case conn_rs_wd:
+ case conn_rs_ws:
+ case conn_fini :
+ return true;
+ default:
+ return false;
+ }
+}
+
+static inline bool pni_raw_rclosing(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_rc_wo:
+ case conn_rc_wd:
+ return true;
+ default:
+ return false;
+ }
+}
+
+static inline bool pni_raw_wopen(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_ro_wo:
+ case conn_rc_wo:
+ case conn_rs_wo:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Return whether closed for writing by application (name a little confusing!)
+// NB Initial state is NOT closed for writing - app can write before connecting
+static inline bool pni_raw_wclosed(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_ro_wd:
+ case conn_ro_wc:
+ case conn_ro_ws:
+ case conn_rc_wd:
+ case conn_rs_wd:
+ case conn_rs_ws:
+ case conn_fini :
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Return whether closed and all writes drained
+static inline bool pni_raw_wdrained(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_ro_wc:
+ case conn_ro_ws:
+ case conn_rs_ws:
+ case conn_fini :
+ return true;
+ default:
+ return false;
+ }
+}
+
+static inline bool pni_raw_wclosing(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_ro_wc:
+ return true;
+ default:
+ return false;
+ }
+}
+
+static inline bool pni_raw_rwclosed(pn_raw_connection_t *conn) {
+ switch (conn->state) {
+ case conn_rs_ws:
+ case conn_fini :
+ return true;
+ default:
+ return false;
+ }
}
bool pni_raw_validate(pn_raw_connection_t *conn) {
@@ -121,12 +257,14 @@
size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *conn) {
assert(conn);
- return read_buffer_count - conn->rbuffer_count;
+ bool rclosed = pni_raw_rclosed(conn);
+ return rclosed ? 0 : (read_buffer_count - conn->rbuffer_count);
}
size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *conn) {
assert(conn);
- return write_buffer_count-conn->wbuffer_count;
+ bool wclosed = pni_raw_wclosed(conn);
+ return wclosed ? 0 : (write_buffer_count-conn->wbuffer_count);
}
size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t const *buffers, size_t num) {
@@ -160,7 +298,7 @@
conn->rbuffer_first_empty = current;
conn->rbuffer_count += can_take;
- conn->rneedbufferevent = false;
+ conn->rrequestedbuffers = false;
return can_take;
}
@@ -232,7 +370,7 @@
conn->wbuffer_first_empty = current;
conn->wbuffer_count += can_take;
- conn->wneedbufferevent = false;
+ conn->wrequestedbuffers = false;
return can_take;
}
@@ -314,16 +452,17 @@
static inline void pni_raw_disconnect(pn_raw_connection_t *conn) {
pni_raw_release_buffers(conn);
conn->disconnectpending = true;
+ conn->state = pni_raw_new_state(conn, int_disconnect);
}
void pni_raw_connected(pn_raw_connection_t *conn) {
pn_condition_clear(conn->condition);
pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
+ conn->state = pni_raw_new_state(conn, conn_connected);
}
void pni_raw_connect_failed(pn_raw_connection_t *conn) {
- conn->rclosed = true;
- conn->wclosed = true;
+ conn->state = conn_fini;
pni_raw_disconnect(conn);
}
@@ -333,6 +472,9 @@
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
assert(conn);
+
+ if (!pni_raw_ropen(conn)) return;
+
bool closed = false;
for(;conn->rbuffer_first_unused;) {
buff_ptr p = conn->rbuffer_first_unused;
@@ -384,14 +526,18 @@
if (conn->rbuffer_first_read && !conn->rpending) {
conn->rpending = true;
}
+
+ uint8_t old_state = conn->state;
+
// Socket closed for read
if (closed) {
- if (!conn->rclosed) {
- conn->rclosed = true;
- conn->rclosedpending = true;
- if (conn->wclosed) {
- pni_raw_disconnect(conn);
- }
+ conn->state = pni_raw_new_state(conn, conn_read_closed);
+ conn->rclosedpending = true;
+ }
+
+ if (conn->state != old_state) {
+ if (pni_raw_rwclosed(conn)) {
+ pni_raw_disconnect(conn);
}
}
return;
@@ -399,7 +545,11 @@
void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
assert(conn);
+
+ if (pni_raw_wdrained(conn)) return;
+
bool closed = false;
+ bool drained = false;
for(;conn->wbuffer_first_towrite;) {
buff_ptr p = conn->wbuffer_first_towrite;
assert(conn->wbuffers[p-1].type == buff_unwritten);
@@ -453,31 +603,52 @@
finished_writing:
if (!conn->wbuffer_first_towrite) {
conn->wbuffer_last_towrite = 0;
- closed = closed || conn->wdraining;
+ drained = true;
}
// Wrote something; end of stream; out of buffers; or blocked for write
if (conn->wbuffer_first_written && !conn->wpending) {
conn->wpending = true;
}
+
+ uint8_t old_state = conn->state;
+
+ // Drained all writes
+ if (drained) {
+ conn->state = pni_raw_new_state(conn, conn_write_drained);
+ }
+
// Socket closed for write
if (closed) {
- if (!conn->wclosed) {
- conn->wclosed = true;
- conn->wclosedpending = true;
- if (conn->rclosed) {
- pni_raw_disconnect(conn);;
- }
+ conn->state = pni_raw_new_state(conn, conn_write_closed);
+ conn->wclosedpending = true;
+ }
+
+ if (conn->state != old_state) {
+ if (pni_raw_rwclosed(conn)) {
+ pni_raw_disconnect(conn);
}
}
return;
}
+void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int)) {
+ assert(conn);
+ if (pni_raw_rclosing(conn)) {
+ shutdown_rd(sock);
+ conn->state = pni_raw_new_state(conn, int_read_shutdown);
+ }
+ if (pni_raw_wclosing(conn)) {
+ shutdown_wr(sock);
+ conn->state = pni_raw_new_state(conn, int_write_shutdown);
+ }
+}
+
bool pni_raw_can_read(pn_raw_connection_t *conn) {
- return !conn->rclosed && conn->rbuffer_first_unused;
+ return pni_raw_ropen(conn) && conn->rbuffer_first_unused;
}
bool pni_raw_can_write(pn_raw_connection_t *conn) {
- return !conn->wclosed && conn->wbuffer_first_towrite;
+ return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite;
}
pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
@@ -510,50 +681,88 @@
} else if (conn->disconnectpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_DISCONNECTED);
conn->disconnectpending = false;
- } else if (!conn->wclosed && !conn->wbuffer_first_towrite && !conn->wneedbufferevent) {
+ } else if (!pni_raw_wdrained(conn) && !conn->wbuffer_first_towrite && !conn->wrequestedbuffers) {
// Ran out of write buffers
pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
- conn->wneedbufferevent = true;
- } else if (!conn->rclosed && !conn->rbuffer_first_unused && !conn->rneedbufferevent) {
+ conn->wrequestedbuffers = true;
+ } else if (!pni_raw_rclosed(conn) && !conn->rbuffer_first_unused && !conn->rrequestedbuffers) {
// Ran out of read buffers
pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
- conn->rneedbufferevent = true;
+ conn->rrequestedbuffers = true;
} else {
return NULL;
}
} while (true);
}
-void pni_raw_close(pn_raw_connection_t *conn) {
- // TODO: Do we need different flags here?
- // TODO: What is the precise semantics for close?
- bool rclosed = conn->rclosed;
- if (!rclosed) {
- conn->rclosed = true;
+void pni_raw_read_close(pn_raw_connection_t *conn) {
+ // If already fully closed nothing to do
+ if (pni_raw_rwclosed(conn)) {
+ return;
+ }
+ if (!pni_raw_rclosed(conn)) {
conn->rclosedpending = true;
}
- bool wclosed = conn->wclosed;
- if (!wclosed) {
- if (conn->wbuffer_first_towrite) {
- conn->wdraining = true;
- } else {
- conn->wclosed = true;
+ conn->state = pni_raw_new_state(conn, api_read_close);
+ if (pni_raw_rwclosed(conn)) {
+ pni_raw_disconnect(conn);
+ }
+}
+
+void pni_raw_write_close(pn_raw_connection_t *conn) {
+ // If already fully closed nothing to do
+ if (pni_raw_rwclosed(conn)) {
+ return;
+ }
+
+ if (!pni_raw_wdrained(conn)) {
+ if (!pni_raw_wclosed(conn)) {
conn->wclosedpending = true;
}
+ conn->state = pni_raw_new_state(conn, api_write_close);
+
+ if (!conn->wbuffer_first_towrite) {
+ conn->state = pni_raw_new_state(conn, conn_write_drained);
+ }
}
- if ((!rclosed || !wclosed) && !conn->wdraining) {
+ if (pni_raw_rwclosed(conn)) {
+ pni_raw_disconnect(conn);
+ }
+}
+
+void pni_raw_close(pn_raw_connection_t *conn) {
+ // If already fully closed nothing to do
+ if (pni_raw_rwclosed(conn)) {
+ return;
+ }
+ if (!pni_raw_rclosed(conn)) {
+ conn->rclosedpending = true;
+ }
+ conn->state = pni_raw_new_state(conn, api_read_close);
+
+ if (!pni_raw_wdrained(conn)) {
+ if (!pni_raw_wclosed(conn)) {
+ conn->wclosedpending = true;
+ }
+ conn->state = pni_raw_new_state(conn, api_write_close);
+
+ if (!conn->wbuffer_first_towrite) {
+ conn->state = pni_raw_new_state(conn, conn_write_drained);
+ }
+ }
+ if (pni_raw_rwclosed(conn)) {
pni_raw_disconnect(conn);
}
}
bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
assert(conn);
- return conn->rclosed;
+ return pni_raw_rclosed(conn);
}
bool pn_raw_connection_is_write_closed(pn_raw_connection_t *conn) {
assert(conn);
- return conn->wclosed || conn->wdraining;
+ return pni_raw_wclosed(conn);
}
pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *conn) {