DISPATCH-1968: Avoid proton calls on a closed raw connections

 * Do not write new buffers if connection is CLOSED_WRITE
 * Do not call connection_wake if CLOSED_READ or CLOSED_WRITE

This closes #1047
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 6cc8855..69bed1a 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -56,6 +56,8 @@
     bool                  egress_dispatcher;
     bool                  connector_closed;//only used if egress_dispatcher=true
     bool                  in_list;         // This connection is in the adaptor's connections list
+    bool                  raw_closed_read;
+    bool                  raw_closed_write;
     qdr_delivery_t       *initial_delivery;
     qd_timer_t           *activate_timer;
     qd_bridge_config_t    config;
@@ -122,6 +124,9 @@
 
 static void grant_read_buffers(qdr_tcp_connection_t *conn)
 {
+    if (conn->raw_closed_read)
+        return;
+
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     // Give proactor more read buffers for the socket
     if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
@@ -231,6 +236,31 @@
     return count;
 }
 
+
+static void flush_outgoing_buffs(qdr_tcp_connection_t *conn)
+{
+    // Flush buffers staged for writing to raw conn
+    // and free possible references to stream data objects.
+    if (conn->outgoing_buff_count > 0) {
+        for (size_t i = conn->outgoing_buff_idx;
+            i < conn->outgoing_buff_idx + conn->outgoing_buff_count;
+            ++i) {
+            if (conn->outgoing_buffs[i].context) {
+                qd_message_stream_data_release(
+                    (qd_message_stream_data_t*)conn->outgoing_buffs[i].context);
+            }
+        }
+    }
+    conn->outgoing_buff_count = 0;
+
+    // Flush in-progress stream data object
+    if (conn->outgoing_stream_data) {
+        free_qd_message_stream_data_t(conn->outgoing_stream_data);
+        conn->outgoing_stream_data = 0;
+    }
+}
+
+
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
 {
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc);
@@ -240,9 +270,7 @@
     if (tc->activate_timer) {
         qd_timer_free(tc->activate_timer);
     }
-    if (tc->outgoing_stream_data) {
-        free_qd_message_stream_data_t(tc->outgoing_stream_data);
-    }
+    flush_outgoing_buffs(tc);
     sys_mutex_free(tc->activation_lock);
     //proactor will free the socket
     free_qdr_tcp_connection_t(tc);
@@ -348,6 +376,7 @@
     return used;
 }
 
+
 static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
 {
     // Send the outgoing buffs to pn_raw_conn.
@@ -383,6 +412,12 @@
 static void handle_outgoing(qdr_tcp_connection_t *conn)
 {
     if (conn->outstream) {
+        if (conn->raw_closed_write) {
+            // flush outgoing buffers and free attached stream_data objects
+            flush_outgoing_buffs(conn);
+            // give no more buffers to raw connection
+            return;
+        }
         qd_message_t *msg = qdr_delivery_message(conn->outstream);
         bool read_more_body = true;
 
@@ -534,11 +569,13 @@
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
+        conn->raw_closed_read = true;
         pn_raw_connection_close(conn->pn_raw_conn);
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+        conn->raw_closed_write = true;
         pn_raw_connection_close(conn->pn_raw_conn);
         break;
     }
@@ -1186,7 +1223,7 @@
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
         sys_mutex_lock(conn->activation_lock);
-        if (conn->pn_raw_conn) {
+        if (conn->pn_raw_conn && !(conn->raw_closed_read || conn->raw_closed_write)) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
             sys_mutex_unlock(conn->activation_lock);