PROTON-2338: Implement proactor raw connection half close operations

The raw connection state machine has been rejigged to encompass half closed
states. The previous separate read/write state booleans have been
combined into a single state. This should allow for clearer connection
state tracking.
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 8722ff0..88410e3 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -272,14 +272,33 @@
   if (notify) notify_poller(prc->task.proactor);
 }
 
-void pn_raw_connection_close(pn_raw_connection_t *rc) {
+static inline void set_closed(pn_raw_connection_t *rc)
+{
   praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
   lock(&prc->task.mutex);
   prc->task.closing = true;
   unlock(&prc->task.mutex);
+}
+
+void pn_raw_connection_close(pn_raw_connection_t *rc) {
+  set_closed(rc);
   pni_raw_close(rc);
 }
 
+void pn_raw_connection_read_close(pn_raw_connection_t *rc) {
+  if (pn_raw_connection_is_write_closed(rc)) {
+    set_closed(rc);
+  }
+  pni_raw_read_close(rc);
+}
+
+void pn_raw_connection_write_close(pn_raw_connection_t *rc) {
+  if (pn_raw_connection_is_read_closed(rc)) {
+    set_closed(rc);
+  }
+  pni_raw_write_close(rc);
+}
+
 static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
   praw_connection_t *rc = containerof(batch, praw_connection_t, batch);
   pn_raw_connection_t *raw = &rc->raw_connection;
@@ -318,6 +337,14 @@
   return recv(fd, b, s, MSG_DONTWAIT);
 }
 
+static int shutr(int fd) {
+  return shutdown(fd, SHUT_RD);
+}
+
+static int shutw(int fd) {
+  return shutdown(fd, SHUT_WR);
+}
+
 static void  set_error(pn_raw_connection_t *conn, const char *msg, int err) {
   psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg);
 }
@@ -372,6 +399,8 @@
   unlock(&rc->task.mutex);
 
   pn_raw_connection_t *raw = &rc->raw_connection;
+  int fd = rc->psocket.epoll_io.fd;
+  pni_raw_process_shutdown(raw, fd, shutr, shutw);
   int wanted =
     (pni_raw_can_read(raw)  ? EPOLLIN : 0) |
     (pni_raw_can_write(raw) ? EPOLLOUT : 0);
@@ -379,7 +408,7 @@
     rc->psocket.epoll_io.wanted = wanted;
     rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for error
   } else {
-    bool finished_disconnect = raw->rclosed && raw->wclosed && !ready && !raw->disconnectpending;
+    bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending;
     if (finished_disconnect) {
       // If we're closed and we've sent the disconnect then close
       pni_raw_finalize(raw);
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 2e183a0..beda92d 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -39,6 +39,29 @@
   buff_written   = 6
 } buff_type;
 
+/*
+ * r = read, w = write
+ * init = initial
+ * o = open - can read/write
+ * d = draining - write draining
+ * c = closing - shutdown pending
+ * s = stopped (closed)
+ */
+typedef enum {
+  conn_wrong = -1,
+  conn_init  = 0,
+  conn_ro_wo = 1,
+  conn_ro_wd = 2,
+  conn_ro_wc = 3,
+  conn_ro_ws = 4,
+  conn_rc_wo = 5,
+  conn_rc_wd = 6,
+  conn_rs_wo = 7,
+  conn_rs_wd = 8,
+  conn_rs_ws = 9,
+  conn_fini  = 10,
+} raw_conn_state;
+
 typedef uint16_t buff_ptr; // This is always the index+1 so that 0 can be special
 
 typedef struct pbuffer_t {
@@ -72,13 +95,13 @@
   buff_ptr wbuffer_last_towrite;
   buff_ptr wbuffer_first_written;
   buff_ptr wbuffer_last_written;
-  bool rneedbufferevent;
-  bool wneedbufferevent;
+
+  uint8_t state; // really raw_conn_state
+  bool rrequestedbuffers;
+  bool wrequestedbuffers;
+
   bool rpending;
   bool wpending;
-  bool rclosed;
-  bool wdraining;
-  bool wclosed;
   bool rclosedpending;
   bool wclosedpending;
   bool rdrainpending;
@@ -95,8 +118,11 @@
 void pni_raw_connect_failed(pn_raw_connection_t *conn);
 void pni_raw_wake(pn_raw_connection_t *conn);
 void pni_raw_close(pn_raw_connection_t *conn);
+void pni_raw_read_close(pn_raw_connection_t *conn);
+void pni_raw_write_close(pn_raw_connection_t *conn);
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
 void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
+void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int));
 bool pni_raw_can_read(pn_raw_connection_t *conn);
 bool pni_raw_can_write(pn_raw_connection_t *conn);
 pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index c88f602..a819b5b 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -62,6 +62,142 @@
 
   conn->rbuffer_first_empty = 1;
   conn->wbuffer_first_empty = 1;
+
+  conn->state = conn_init;
+}
+
+typedef enum {
+  conn_connected     = 0,
+  conn_read_closed   = 1,
+  conn_write_closed  = 2,
+  conn_write_drained = 3,
+  int_read_shutdown  = 4,
+  int_write_shutdown = 5,
+  int_disconnect     = 6,
+  api_read_close     = 7,
+  api_write_close    = 8,
+} raw_event;
+
+/*
+ * There's a little trick in this state table - both conn_init - the initial state and the self transition are represented by
+ * 0. This is because no state ever transitions to the initial state.
+ */
+const uint8_t state_transitions[][9] = {
+  /* State\event  cconnected, crclosed,   cwclosed,   cwdrained,  irshutdown, iwshutdown, idsconnect, arclose,    awclose   */
+  [conn_init]  = {conn_ro_wo, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_rc_wo, conn_ro_wd},
+  [conn_ro_wo] = {conn_wrong, conn_rc_wo, conn_ro_wc, 0,          conn_wrong, conn_wrong, conn_wrong, conn_rc_wo, conn_ro_wd},
+  [conn_ro_wd] = {conn_wrong, conn_rc_wd, conn_ro_wc, conn_ro_wc, conn_wrong, conn_wrong, conn_wrong, conn_rc_wd, 0},
+  [conn_ro_wc] = {conn_wrong, conn_rs_ws, 0,          conn_wrong, conn_wrong, conn_ro_ws, conn_wrong, conn_rs_ws, 0},
+  [conn_ro_ws] = {conn_wrong, conn_rs_ws, 0,          0,          conn_wrong, conn_wrong, conn_wrong, conn_rs_ws, 0},
+  [conn_rc_wo] = {conn_wrong, 0,          conn_rs_ws, 0,          conn_rs_wo, conn_wrong, conn_wrong, 0,          conn_rc_wd},
+  [conn_rc_wd] = {conn_wrong, 0,          conn_rs_ws, conn_rs_ws, conn_rs_wd, conn_wrong, conn_wrong, 0,          0},
+  [conn_rs_wo] = {conn_wrong, 0,          conn_rs_ws, 0,          conn_wrong, conn_wrong, conn_wrong, 0,          conn_rs_wd},
+  [conn_rs_wd] = {conn_wrong, 0,          conn_rs_ws, conn_rs_ws, conn_wrong, conn_wrong, conn_wrong, 0,          0},
+  [conn_rs_ws] = {conn_wrong, 0,          0,          conn_wrong, conn_wrong, conn_wrong, conn_fini,  0,          0},
+  [conn_fini]  = {conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, 0,          0,          0},
+};
+
+static inline uint8_t pni_raw_new_state(pn_raw_connection_t *conn, raw_event event) {
+  uint8_t old_state = conn->state;
+  uint8_t new_state = state_transitions[old_state][event];
+  assert(new_state != (uint8_t)conn_wrong);
+  return new_state == 0 ? old_state : new_state;
+}
+
+static inline bool pni_raw_ropen(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_ro_wo:
+    case conn_ro_wd:
+    case conn_ro_wc:
+    case conn_ro_ws:
+      return true;
+    default:
+      return false;
+  }
+}
+
+static inline bool pni_raw_rclosed(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_rc_wo:
+    case conn_rc_wd:
+    case conn_rs_wo:
+    case conn_rs_wd:
+    case conn_rs_ws:
+    case conn_fini :
+      return true;
+    default:
+      return false;
+  }
+}
+
+static inline bool pni_raw_rclosing(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_rc_wo:
+    case conn_rc_wd:
+      return true;
+    default:
+      return false;
+  }
+}
+
+static inline bool pni_raw_wopen(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_ro_wo:
+    case conn_rc_wo:
+    case conn_rs_wo:
+      return true;
+    default:
+      return false;
+  }
+}
+
+// Return whether closed for writing by application (name a little confusing!)
+// NB Initial state is NOT closed for writing - app can write before connecting
+static inline bool pni_raw_wclosed(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_ro_wd:
+    case conn_ro_wc:
+    case conn_ro_ws:
+    case conn_rc_wd:
+    case conn_rs_wd:
+    case conn_rs_ws:
+    case conn_fini :
+      return true;
+    default:
+      return false;
+  }
+}
+
+// Return whether closed and all writes drained
+static inline bool pni_raw_wdrained(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_ro_wc:
+    case conn_ro_ws:
+    case conn_rs_ws:
+    case conn_fini :
+      return true;
+    default:
+      return false;
+  }
+}
+
+static inline bool pni_raw_wclosing(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_ro_wc:
+      return true;
+    default:
+      return false;
+  }
+}
+
+static inline bool pni_raw_rwclosed(pn_raw_connection_t *conn) {
+  switch (conn->state) {
+    case conn_rs_ws:
+    case conn_fini :
+      return true;
+    default:
+      return false;
+  }
 }
 
 bool pni_raw_validate(pn_raw_connection_t *conn) {
@@ -121,12 +257,14 @@
 
 size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *conn) {
   assert(conn);
-  return read_buffer_count - conn->rbuffer_count;
+  bool rclosed = pni_raw_rclosed(conn);
+  return rclosed ? 0 : (read_buffer_count - conn->rbuffer_count);
 }
 
 size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *conn) {
   assert(conn);
-  return write_buffer_count-conn->wbuffer_count;
+  bool wclosed = pni_raw_wclosed(conn);
+  return wclosed ? 0 : (write_buffer_count-conn->wbuffer_count);
 }
 
 size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t const *buffers, size_t num) {
@@ -160,7 +298,7 @@
   conn->rbuffer_first_empty = current;
 
   conn->rbuffer_count += can_take;
-  conn->rneedbufferevent = false;
+  conn->rrequestedbuffers = false;
   return can_take;
 }
 
@@ -232,7 +370,7 @@
   conn->wbuffer_first_empty = current;
 
   conn->wbuffer_count += can_take;
-  conn->wneedbufferevent = false;
+  conn->wrequestedbuffers = false;
   return can_take;
 }
 
@@ -314,16 +452,17 @@
 static inline void pni_raw_disconnect(pn_raw_connection_t *conn) {
   pni_raw_release_buffers(conn);
   conn->disconnectpending = true;
+  conn->state = pni_raw_new_state(conn, int_disconnect);
 }
 
 void pni_raw_connected(pn_raw_connection_t *conn) {
   pn_condition_clear(conn->condition);
   pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
+  conn->state = pni_raw_new_state(conn, conn_connected);
 }
 
 void pni_raw_connect_failed(pn_raw_connection_t *conn) {
-  conn->rclosed = true;
-  conn->wclosed = true;
+  conn->state = conn_fini;
   pni_raw_disconnect(conn);
 }
 
@@ -333,6 +472,9 @@
 
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
   assert(conn);
+
+  if (!pni_raw_ropen(conn)) return;
+
   bool closed = false;
   for(;conn->rbuffer_first_unused;) {
     buff_ptr p = conn->rbuffer_first_unused;
@@ -384,14 +526,18 @@
   if (conn->rbuffer_first_read && !conn->rpending) {
     conn->rpending = true;
   }
+
+  uint8_t old_state = conn->state;
+
   // Socket closed for read
   if (closed) {
-    if (!conn->rclosed) {
-      conn->rclosed = true;
-      conn->rclosedpending = true;
-      if (conn->wclosed) {
-        pni_raw_disconnect(conn);
-      }
+    conn->state = pni_raw_new_state(conn, conn_read_closed);
+    conn->rclosedpending = true;
+  }
+
+  if (conn->state != old_state) {
+    if (pni_raw_rwclosed(conn)) {
+      pni_raw_disconnect(conn);
     }
   }
   return;
@@ -399,7 +545,11 @@
 
 void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
   assert(conn);
+
+  if (pni_raw_wdrained(conn)) return;
+
   bool closed = false;
+  bool drained = false;
   for(;conn->wbuffer_first_towrite;) {
     buff_ptr p = conn->wbuffer_first_towrite;
     assert(conn->wbuffers[p-1].type == buff_unwritten);
@@ -453,31 +603,52 @@
 finished_writing:
   if (!conn->wbuffer_first_towrite) {
     conn->wbuffer_last_towrite = 0;
-    closed = closed || conn->wdraining;
+    drained = true;
   }
   // Wrote something; end of stream; out of buffers; or blocked for write
   if (conn->wbuffer_first_written && !conn->wpending) {
     conn->wpending = true;
   }
+
+  uint8_t old_state = conn->state;
+
+  // Drained all writes
+  if (drained) {
+    conn->state = pni_raw_new_state(conn, conn_write_drained);
+  }
+
   // Socket closed for write
   if (closed) {
-    if (!conn->wclosed) {
-      conn->wclosed = true;
-      conn->wclosedpending = true;
-      if (conn->rclosed) {
-        pni_raw_disconnect(conn);;
-      }
+    conn->state = pni_raw_new_state(conn, conn_write_closed);
+    conn->wclosedpending = true;
+  }
+
+  if (conn->state != old_state) {
+    if (pni_raw_rwclosed(conn)) {
+      pni_raw_disconnect(conn);
     }
   }
   return;
 }
 
+void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int)) {
+  assert(conn);
+  if (pni_raw_rclosing(conn)) {
+    shutdown_rd(sock);
+    conn->state = pni_raw_new_state(conn, int_read_shutdown);
+  }
+  if (pni_raw_wclosing(conn)) {
+    shutdown_wr(sock);
+    conn->state = pni_raw_new_state(conn, int_write_shutdown);
+  }
+}
+
 bool pni_raw_can_read(pn_raw_connection_t *conn) {
-  return !conn->rclosed && conn->rbuffer_first_unused;
+  return pni_raw_ropen(conn) && conn->rbuffer_first_unused;
 }
 
 bool pni_raw_can_write(pn_raw_connection_t *conn) {
-  return !conn->wclosed && conn->wbuffer_first_towrite;
+  return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite;
 }
 
 pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
@@ -510,50 +681,88 @@
     } else if (conn->disconnectpending) {
       pni_raw_put_event(conn, PN_RAW_CONNECTION_DISCONNECTED);
       conn->disconnectpending = false;
-    } else if (!conn->wclosed && !conn->wbuffer_first_towrite && !conn->wneedbufferevent) {
+    } else if (!pni_raw_wdrained(conn) && !conn->wbuffer_first_towrite && !conn->wrequestedbuffers) {
       // Ran out of write buffers
       pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
-      conn->wneedbufferevent = true;
-    } else if (!conn->rclosed && !conn->rbuffer_first_unused && !conn->rneedbufferevent) {
+      conn->wrequestedbuffers = true;
+    } else if (!pni_raw_rclosed(conn) && !conn->rbuffer_first_unused && !conn->rrequestedbuffers) {
       // Ran out of read buffers
       pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
-      conn->rneedbufferevent = true;
+      conn->rrequestedbuffers = true;
     } else {
       return NULL;
     }
   } while (true);
 }
 
-void pni_raw_close(pn_raw_connection_t *conn) {
-  // TODO: Do we need different flags here?
-  // TODO: What is the precise semantics for close?
-  bool rclosed = conn->rclosed;
-  if (!rclosed) {
-    conn->rclosed = true;
+void pni_raw_read_close(pn_raw_connection_t *conn) {
+  // If already fully closed nothing to do
+  if (pni_raw_rwclosed(conn)) {
+    return;
+  }
+  if (!pni_raw_rclosed(conn)) {
     conn->rclosedpending = true;
   }
-  bool wclosed = conn->wclosed;
-  if (!wclosed) {
-    if (conn->wbuffer_first_towrite) {
-      conn->wdraining = true;
-    } else {
-      conn->wclosed = true;
+  conn->state = pni_raw_new_state(conn, api_read_close);
+  if (pni_raw_rwclosed(conn)) {
+    pni_raw_disconnect(conn);
+  }
+}
+
+void pni_raw_write_close(pn_raw_connection_t *conn) {
+  // If already fully closed nothing to do
+  if (pni_raw_rwclosed(conn)) {
+    return;
+  }
+
+  if (!pni_raw_wdrained(conn)) {
+    if (!pni_raw_wclosed(conn)) {
       conn->wclosedpending = true;
     }
+    conn->state = pni_raw_new_state(conn, api_write_close);
+
+    if (!conn->wbuffer_first_towrite) {
+      conn->state = pni_raw_new_state(conn, conn_write_drained);
+    }
   }
-  if ((!rclosed || !wclosed) && !conn->wdraining) {
+  if (pni_raw_rwclosed(conn)) {
+    pni_raw_disconnect(conn);
+  }
+}
+
+void pni_raw_close(pn_raw_connection_t *conn) {
+  // If already fully closed nothing to do
+  if (pni_raw_rwclosed(conn)) {
+    return;
+  }
+  if (!pni_raw_rclosed(conn)) {
+    conn->rclosedpending = true;
+  }
+  conn->state = pni_raw_new_state(conn, api_read_close);
+
+  if (!pni_raw_wdrained(conn)) {
+    if (!pni_raw_wclosed(conn)) {
+      conn->wclosedpending = true;
+    }
+    conn->state = pni_raw_new_state(conn, api_write_close);
+
+    if (!conn->wbuffer_first_towrite) {
+      conn->state = pni_raw_new_state(conn, conn_write_drained);
+    }
+  }
+  if (pni_raw_rwclosed(conn)) {
     pni_raw_disconnect(conn);
   }
 }
 
 bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
   assert(conn);
-  return conn->rclosed;
+  return pni_raw_rclosed(conn);
 }
 
 bool pn_raw_connection_is_write_closed(pn_raw_connection_t *conn) {
   assert(conn);
-  return conn->wclosed || conn->wdraining;
+  return pni_raw_wclosed(conn);
 }
 
 pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *conn) {