DISPATCH-1968: Avoid proton calls on a closed raw connections
* Do not write new buffers if connection is CLOSED_WRITE
* Do not call connection_wake if CLOSED_READ or CLOSED_WRITE
This closes #1047
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 6cc8855..69bed1a 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -56,6 +56,8 @@
bool egress_dispatcher;
bool connector_closed;//only used if egress_dispatcher=true
bool in_list; // This connection is in the adaptor's connections list
+ bool raw_closed_read;
+ bool raw_closed_write;
qdr_delivery_t *initial_delivery;
qd_timer_t *activate_timer;
qd_bridge_config_t config;
@@ -122,6 +124,9 @@
static void grant_read_buffers(qdr_tcp_connection_t *conn)
{
+ if (conn->raw_closed_read)
+ return;
+
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
// Give proactor more read buffers for the socket
if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
@@ -231,6 +236,31 @@
return count;
}
+
+static void flush_outgoing_buffs(qdr_tcp_connection_t *conn)
+{
+ // Flush buffers staged for writing to raw conn
+ // and free possible references to stream data objects.
+ if (conn->outgoing_buff_count > 0) {
+ for (size_t i = conn->outgoing_buff_idx;
+ i < conn->outgoing_buff_idx + conn->outgoing_buff_count;
+ ++i) {
+ if (conn->outgoing_buffs[i].context) {
+ qd_message_stream_data_release(
+ (qd_message_stream_data_t*)conn->outgoing_buffs[i].context);
+ }
+ }
+ }
+ conn->outgoing_buff_count = 0;
+
+ // Flush in-progress stream data object
+ if (conn->outgoing_stream_data) {
+ free_qd_message_stream_data_t(conn->outgoing_stream_data);
+ conn->outgoing_stream_data = 0;
+ }
+}
+
+
static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
{
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc);
@@ -240,9 +270,7 @@
if (tc->activate_timer) {
qd_timer_free(tc->activate_timer);
}
- if (tc->outgoing_stream_data) {
- free_qd_message_stream_data_t(tc->outgoing_stream_data);
- }
+ flush_outgoing_buffs(tc);
sys_mutex_free(tc->activation_lock);
//proactor will free the socket
free_qdr_tcp_connection_t(tc);
@@ -348,6 +376,7 @@
return used;
}
+
static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
{
// Send the outgoing buffs to pn_raw_conn.
@@ -383,6 +412,12 @@
static void handle_outgoing(qdr_tcp_connection_t *conn)
{
if (conn->outstream) {
+ if (conn->raw_closed_write) {
+ // flush outgoing buffers and free attached stream_data objects
+ flush_outgoing_buffs(conn);
+ // give no more buffers to raw connection
+ return;
+ }
qd_message_t *msg = qdr_delivery_message(conn->outstream);
bool read_more_body = true;
@@ -534,11 +569,13 @@
}
case PN_RAW_CONNECTION_CLOSED_READ: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
+ conn->raw_closed_read = true;
pn_raw_connection_close(conn->pn_raw_conn);
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+ conn->raw_closed_write = true;
pn_raw_connection_close(conn->pn_raw_conn);
break;
}
@@ -1186,7 +1223,7 @@
if (context) {
qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
sys_mutex_lock(conn->activation_lock);
- if (conn->pn_raw_conn) {
+ if (conn->pn_raw_conn && !(conn->raw_closed_read || conn->raw_closed_write)) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id);
pn_raw_connection_wake(conn->pn_raw_conn);
sys_mutex_unlock(conn->activation_lock);