DISPATCH-1960: Refactor Q2 flow control for protocol adaptor use.

This closes #1027
diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h
index e8722a5..d872ff4 100644
--- a/include/qpid/dispatch/alloc_pool.h
+++ b/include/qpid/dispatch/alloc_pool.h
@@ -80,7 +80,26 @@
 /** De-allocate from a thread pool. Use via ALLOC_DECLARE */
 void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p);
 uint32_t qd_alloc_sequence(void *p);
-static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; }
+
+// generic safe pointer api for any alloc pool item
+
+#define QD_SAFE_PTR_INIT(p) { .ptr = (void*)(p), .seq = qd_alloc_sequence(p) }
+
+static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp)
+{
+    sp->ptr = 0;
+}
+
+static inline void qd_alloc_set_safe_ptr(qd_alloc_safe_ptr_t *sp, void *p)
+{
+    sp->ptr = p;
+    sp->seq = qd_alloc_sequence(p);
+}
+
+static inline void *qd_alloc_deref_safe_ptr(const qd_alloc_safe_ptr_t *sp)
+{
+    return sp->seq == qd_alloc_sequence(sp->ptr) ? sp->ptr : (void*) 0;
+}
 
 /**
  * Declare functions new_T and alloc_T
@@ -102,8 +121,8 @@
     __thread qd_alloc_pool_t *__local_pool_##T = 0;                     \
     T *new_##T(void) { return (T*) qd_alloc(&__desc_##T, &__local_pool_##T); }  \
     void free_##T(T *p) { qd_dealloc(&__desc_##T, &__local_pool_##T, (char*) p); } \
-    void set_safe_ptr_##T(T *p, T##_sp *sp) { sp->ptr = (void*) p; sp->seq = qd_alloc_sequence((void*) p); } \
-    T *safe_deref_##T(T##_sp sp) { return sp.seq == qd_alloc_sequence((void*) sp.ptr) ? (T*) sp.ptr : (T*) 0; } \
+    void set_safe_ptr_##T(T *p, T##_sp *sp) { qd_alloc_set_safe_ptr(sp, (void*)p); } \
+    T *safe_deref_##T(T##_sp sp) { return (T*) qd_alloc_deref_safe_ptr((qd_alloc_safe_ptr_t*) &(sp)); } \
     qd_alloc_stats_t *alloc_stats_##T(void) { return __desc_##T.stats; } \
     void *unused##T
 
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 08caba1..82d9ad8 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -228,7 +228,7 @@
 void qd_link_detach(qd_link_t *link);
 void qd_link_free(qd_link_t *link);
 void *qd_link_get_node_context(const qd_link_t *link);
-void qd_link_restart_rx(qd_link_t *link);
+void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
 void qd_link_q3_block(qd_link_t *link);
 void qd_link_q3_unblock(qd_link_t *link);
 uint64_t qd_link_link_id(const qd_link_t *link);
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 657477d..07e144a 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -253,10 +253,9 @@
  * @param msg A pointer to a message to be sent.
  * @param link The outgoing link on which to send the message.
  * @param strip_outbound_annotations [in] annotation control flag
- * @param restart_rx [out] indication to wake up receive process
  * @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full
  */
-void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx, bool *q3_stalled);
+void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *q3_stalled);
 
 /**
  * Check that the message is well-formed up to a certain depth.  Any part of the message that is
@@ -304,9 +303,10 @@
  *
  * @param msg Pointer to a message
  * @param field A composed field to be appended to the end of the message's stream
+ * @param q2_blocked Set to true if this call caused Q2 to block
  * @return The number of buffers stored in the message's content
  */
-int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field);
+int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked);
 
 
 /**
@@ -404,9 +404,10 @@
  *
  * @param msg Pointer to message under construction
  * @param data List of buffers containing body data.
+ * @param qd_blocked Set to true if this call caused Q2 to block
  * @return The number of buffers stored in the message's content
  */
-int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data);
+int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked);
 
 
 /** Put string representation of a message suitable for logging in buffer.
@@ -556,12 +557,22 @@
  */
 bool qd_message_is_Q2_blocked(const qd_message_t *msg);
 
+
 /**
- * Return qd_link through which the message is being received.
- * @param msg A pointer to the message
- * @return the qd_link
+ * Register a callback that will be invoked when the message has exited the Q2
+ * blocking state. Note that the callback can be invoked on any I/O thread.
+ * The callback must be thread safe.
+ *
+ * @param msg The message to monitor.
+ * @param callback The method to invoke
+ * @param context safe pointer holding the context
  */
-qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg);
+
+typedef void (*qd_message_q2_unblocked_handler_t)(qd_alloc_safe_ptr_t context);
+void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
+                                         qd_message_q2_unblocked_handler_t callback,
+                                         qd_alloc_safe_ptr_t context);
+void qd_message_clear_q2_unblocked_handler(qd_message_t *msg);
 
 /**
  * Return message aborted state
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 1d17724..27b25cd 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -860,7 +860,8 @@
            "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    qd_message_stream_data_append(msg, body);
+    // @TODO(kgiusti): handle Q2 block event:
+    qd_message_stream_data_append(msg, body, 0);
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 3f46e44..1ef6b2a 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -976,7 +976,8 @@
 
     qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
 
-    qd_message_stream_data_append(msg, body);
+    // @TODO(kgiusti): handle Q2 block event:
+    qd_message_stream_data_append(msg, body, 0);
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 2330ece..25347e4 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -465,7 +465,8 @@
                     qd_buffer_list_t existing_buffers;
                     DEQ_INIT(existing_buffers);
                     qd_compose_take_buffers(stream_data->body, &existing_buffers);
-                    qd_message_stream_data_append(stream_data->message, &existing_buffers);
+                    // @TODO(kgiusti): handle Q2 block event:
+                    qd_message_stream_data_append(stream_data->message, &existing_buffers, 0);
                     stream_data->body_data_added = true;
                 }
             }
@@ -475,7 +476,8 @@
                 stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
                 stream_data->body_data_added = true;
             }
-            qd_message_stream_data_append(stream_data->message, &buffers);
+            // @TODO(kgiusti): handle Q2 block event:
+            qd_message_stream_data_append(stream_data->message, &buffers, 0);
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, stream_id);
         }
         else {
@@ -930,7 +932,8 @@
             if (!stream_data->body) {
                 stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
                 qd_compose_insert_binary(stream_data->body, 0, 0);
-                qd_message_extend(stream_data->message, stream_data->body);
+                // @TODO(kgiusti): handle Q2 block event:
+                qd_message_extend(stream_data->message, stream_data->body, 0);
             }
         }
 
@@ -965,7 +968,8 @@
             if (stream_data->use_footer_properties) {
                 qd_compose_end_map(stream_data->footer_properties);
                 stream_data->entire_footer_arrived = true;
-                qd_message_extend(stream_data->message, stream_data->footer_properties);
+                // @TODO(kgiusti): handle Q2 block event:
+                qd_message_extend(stream_data->message, stream_data->footer_properties, 0);
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Closing footer map, extending message with footer", conn->conn_id, stream_id);
             }
             else {
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 2fd3ff2..9982ad0 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -497,7 +497,8 @@
             //
             // Extend the streaming message and free the composed field
             //
-            depth = qd_message_extend(adaptor->streaming_message, field);
+            // TODO(kgiusti): need to handle Q2 blocking event
+            depth = qd_message_extend(adaptor->streaming_message, field, 0);
             qd_compose_free(field);
         }
 
@@ -519,7 +520,8 @@
         qd_compose_insert_symbol(footer, "second");
         qd_compose_insert_string(footer, "value2");
         qd_compose_end_map(footer);
-        depth = qd_message_extend(adaptor->streaming_message, footer);
+        // @TODO(kgiusti): need to handle Q2 blocking event
+        depth = qd_message_extend(adaptor->streaming_message, footer, 0);
         qd_compose_free(footer);
 
         qd_message_set_receive_complete(adaptor->streaming_message);
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index fce2219..6cc8855 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -183,7 +183,8 @@
     grant_read_buffers(conn);
 
     if (conn->instream) {
-        qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers);
+        // @TODO(kgiusti): handle Q2 block event:
+        qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, 0);
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
     } else {
diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index 284630e..2ab6567 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -507,6 +507,10 @@
         return 0;
 
     qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
+#ifdef QD_MEMORY_DEBUG
+    // ensure p actually points to an alloc pool item
+    assert(item->header == PATTERN_FRONT);
+#endif
     return item->sequence;
 }
 
diff --git a/src/message.c b/src/message.c
index 3e86f54..f81d15e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1020,7 +1020,8 @@
 {
     if (!in_msg) return;
     uint32_t rc;
-    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    qd_message_pvt_t          *msg        = (qd_message_pvt_t*) in_msg;
+    qd_message_q2_unblocker_t  q2_unblock = {0};
 
     qd_buffer_list_free_buffers(&msg->ma_to_override);
     qd_buffer_list_free_buffers(&msg->ma_trace);
@@ -1055,12 +1056,16 @@
             && was_blocked
             && qd_message_Q2_holdoff_should_unblock(in_msg)) {
             content->q2_input_holdoff = false;
-            qd_link_restart_rx(qd_message_get_receiving_link(in_msg));
+            q2_unblock = content->q2_unblocker;
         }
 
         UNLOCK(content->lock);
     }
 
+    // the Q2 handler must be invoked outside the lock
+    if (q2_unblock.handler)
+        q2_unblock.handler(q2_unblock.context);
+
     rc = sys_atomic_dec(&content->ref_count) - 1;
     if (rc == 0) {
         if (content->ma_field_iter_in)
@@ -1320,7 +1325,14 @@
 {
     if (!!in_msg) {
         qd_message_content_t *content = MSG_CONTENT(in_msg);
+
+        LOCK(content->lock);
+
         content->receive_complete = true;
+        content->q2_unblocker.handler = 0;
+        qd_nullify_safe_ptr(&content->q2_unblocker.context);
+
+        UNLOCK(content->lock);
     }
 }
 
@@ -1384,7 +1396,6 @@
         } else if (rc == PN_EOS || rc < 0) {
             // End of message or error: finalize message_receive handling
             msg->content->aborted = pn_delivery_aborted(delivery);
-            qd_nullify_safe_ptr(&msg->content->input_link_sp);
             pn_record_t *record = pn_delivery_attachments(delivery);
             pn_record_set(record, PN_DELIVERY_CTX, 0);
             if (msg->content->oversize) {
@@ -1392,7 +1403,7 @@
                 // This has no effect on the received message.
                 msg->content->aborted = true;
             }
-            msg->content->receive_complete = true;
+            qd_message_set_receive_complete((qd_message_t*) msg);
             break;
         } else {
             // rc was > 0. bytes were read and discarded.
@@ -1429,7 +1440,8 @@
     if (!msg) {
         msg = (qd_message_pvt_t*) qd_message();
         qd_connection_t *qdc = qd_link_connection(qdl);
-        set_safe_ptr_qd_link_t(qdl, &msg->content->input_link_sp);
+        qd_alloc_safe_ptr_t sp = QD_SAFE_PTR_INIT(qdl);
+        qd_message_set_q2_unblocked_handler((qd_message_t*) msg, qd_link_q2_restart_receive, sp);
         msg->strip_annotations_in  = qd_connection_strip_annotations_in(qdc);
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
         pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
@@ -1491,8 +1503,9 @@
                 }
 
                 content->receive_complete = true;
+                content->q2_unblocker.handler = 0;
+                qd_nullify_safe_ptr(&content->q2_unblocker.context);
                 content->aborted = pn_delivery_aborted(delivery);
-                qd_nullify_safe_ptr(&content->input_link_sp);
 
                 // unlink message and delivery
                 pn_record_set(record, PN_DELIVERY_CTX, 0);
@@ -1726,7 +1739,6 @@
 void qd_message_send(qd_message_t *in_msg,
                      qd_link_t    *link,
                      bool          strip_annotations,
-                     bool         *restart_rx,
                      bool         *q3_stalled)
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
@@ -1734,7 +1746,6 @@
     qd_buffer_t          *buf     = 0;
     pn_link_t            *pnl     = qd_link_pn(link);
 
-    *restart_rx                   = false;
     *q3_stalled                   = false;
 
     if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
@@ -1842,8 +1853,9 @@
 
     buf = msg->cursor.buffer;
 
-    pn_session_t     *pns = pn_link_session(pnl);
-    const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
+    qd_message_q2_unblocker_t  q2_unblock = {0};
+    pn_session_t              *pns        = pn_link_session(pnl);
+    const size_t               q3_upper   = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
 
     while (!content->aborted
            && buf
@@ -1913,7 +1925,7 @@
                                 // set input holdoff before the deferred handler
                                 // runs.
                                 content->q2_input_holdoff = false;
-                                *restart_rx = true;
+                                q2_unblock = content->q2_unblocker;
                             }
                         }
                     }   // end free buffer
@@ -1940,6 +1952,10 @@
         UNLOCK(content->lock);
     }
 
+    // the Q2 handler must be invoked outside the lock
+    if (q2_unblock.handler)
+        q2_unblock.handler(q2_unblock.context);
+
     if (content->aborted) {
         if (pn_link_current(pnl)) {
             msg->send_complete = true;
@@ -2319,13 +2335,16 @@
 }
 
 
-int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
+int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked)
 {
     qd_message_content_t *content = MSG_CONTENT(msg);
     int                   count;
     qd_buffer_list_t     *buffers = qd_compose_buffers(field);
     qd_buffer_t          *buf     = DEQ_HEAD(*buffers);
 
+    if (q2_blocked)
+        *q2_blocked = false;
+
     LOCK(content->lock);
     while (buf) {
         qd_buffer_set_fanout(buf, content->fanout);
@@ -2334,6 +2353,14 @@
 
     DEQ_APPEND(content->buffers, (*buffers));
     count = DEQ_SIZE(content->buffers);
+
+    // buffers added - much check for Q2:
+    if (qd_message_Q2_holdoff_should_block(msg)) {
+        content->q2_input_holdoff = true;
+        if (q2_blocked)
+            *q2_blocked = true;
+    }
+
     UNLOCK(content->lock);
     return count;
 }
@@ -2549,7 +2576,8 @@
 
     LOCK(content->lock);
 
-    bool was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt);
+    bool                      was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt);
+    qd_message_q2_unblocker_t q2_unblock  = {0};
 
     if (pvt->is_fanout) {
         buf = start_buf;
@@ -2581,13 +2609,16 @@
         && was_blocked
         && qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt)) {
         content->q2_input_holdoff = false;
-        qd_link_restart_rx(qd_message_get_receiving_link((qd_message_t*) pvt));
+        q2_unblock = content->q2_unblocker;
     }
 
     UNLOCK(content->lock);
 
     DEQ_REMOVE(pvt->stream_data_list, stream_data);
     free_qd_message_stream_data_t(stream_data);
+
+    if (q2_unblock.handler)
+        q2_unblock.handler(q2_unblock.context);
 }
 
 
@@ -2820,12 +2851,6 @@
 }
 
 
-qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg)
-{
-    return safe_deref_qd_link_t(((qd_message_pvt_t *)msg)->content->input_link_sp);
-}
-
-
 bool qd_message_aborted(const qd_message_t *msg)
 {
     return ((qd_message_pvt_t *)msg)->content->aborted;
@@ -2847,12 +2872,15 @@
 }
 
 
-int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data)
+int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked)
 {
     unsigned int        length = DEQ_SIZE(*data);
     qd_composed_field_t *field = 0;
     int rc = 0;
 
+    if (q2_blocked)
+        *q2_blocked = false;
+
     if (length == 0)
         return rc;
 
@@ -2887,7 +2915,35 @@
     field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
     qd_compose_insert_binary_buffers(field, data);
 
-    rc = qd_message_extend(message, field);
+    rc = qd_message_extend(message, field, q2_blocked);
     qd_compose_free(field);
     return rc;
 }
+
+
+void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
+                                         qd_message_q2_unblocked_handler_t callback,
+                                         qd_alloc_safe_ptr_t context)
+{
+    qd_message_content_t *content = MSG_CONTENT(msg);
+
+    LOCK(content->lock);
+
+    content->q2_unblocker.handler = callback;
+    content->q2_unblocker.context = context;
+
+    UNLOCK(content->lock);
+}
+
+
+void qd_message_clear_Q2_unblocked_handler(qd_message_t *msg)
+{
+    qd_message_content_t *content = MSG_CONTENT(msg);
+
+    LOCK(content->lock);
+
+    content->q2_unblocker.handler = 0;
+    qd_nullify_safe_ptr(&content->q2_unblocker.context);
+
+    UNLOCK(content->lock);
+}
diff --git a/src/message_private.h b/src/message_private.h
index a8067c7..c4262a9 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -73,6 +73,13 @@
 ALLOC_DECLARE(qd_message_stream_data_t);
 DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t);
 
+
+typedef struct {
+    qd_message_q2_unblocked_handler_t  handler;
+    qd_alloc_safe_ptr_t                context;
+} qd_message_q2_unblocker_t;
+
+
 // TODO - consider using pointers to qd_field_location_t below to save memory
 // TODO - provide a way to allocate a message without a lock for the link-routing case.
 //        It's likely that link-routing will cause no contention for the message content.
@@ -126,7 +133,8 @@
     uint64_t             max_message_size;               // configured max; 0 if no max to enforce
     uint64_t             bytes_received;                 // bytes returned by pn_link_recv() when enforcing max_message_size
     uint32_t             fanout;                         // The number of receivers for this message, including in-process subscribers.
-    qd_link_t_sp         input_link_sp;                  // message received on this link
+
+    qd_message_q2_unblocker_t q2_unblocker;              // callback and context to signal Q2 unblocked to receiver
 
     bool                 ma_parsed;                      // have parsed annotations in incoming message
     bool                 discard;                        // Should this message be discarded?
diff --git a/src/router_node.c b/src/router_node.c
index 7dbdb0d..70d98f9 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -855,6 +855,7 @@
     if (!discard) {
         qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl);
         if (!!qdl) {
+            assert(qd_link_direction(qdl) == QD_INCOMING);
             qd_router_t *qdr = (qd_router_t*) qd_link_get_node_context(qdl);
             assert(qdr != 0);
             while (true) {
@@ -1914,22 +1915,17 @@
     if (!pdlv)
         return 0;
 
-    bool restart_rx = false;
     bool q3_stalled = false;
 
     qd_message_t *msg_out = qdr_delivery_message(dlv);
 
-    qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx, &q3_stalled);
+    qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &q3_stalled);
 
     if (q3_stalled) {
         qd_link_q3_block(qlink);
         qdr_link_stalled_outbound(link);
     }
 
-    if (restart_rx) {
-        qd_link_restart_rx(qd_message_get_receiving_link(msg_out));
-    }
-
     bool send_complete = qdr_delivery_send_complete(dlv);
 
     if (send_complete) {
@@ -2059,7 +2055,10 @@
                 // and if it is blocked by Q2 holdoff, get the link rolling again.
                 //
                 qd_message_Q2_holdoff_disable(msg);
-                qd_link_restart_rx(link);
+
+                qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
+                set_safe_ptr_qd_link_t(link, safe_ptr);
+                qd_connection_invoke_deferred(qd_conn, deferred_AMQP_rx_handler, safe_ptr);
             }
         }
     }
@@ -2131,10 +2130,13 @@
 }
 
 
-// called when Q2 holdoff is deactivated so we can receive more message buffers
+// invoked by an I/O thread when enough buffers have been released deactivate
+// the Q2 block.  Note that this method will likely be running on a worker
+// thread that is not the same thread that "owns" the qd_link_t passed in.
 //
-void qd_link_restart_rx(qd_link_t *in_link)
+void qd_link_q2_restart_receive(qd_alloc_safe_ptr_t context)
 {
+    qd_link_t *in_link = (qd_link_t*) qd_alloc_deref_safe_ptr(&context);
     if (!in_link)
         return;
 
@@ -2142,8 +2144,8 @@
 
     qd_connection_t *in_conn = qd_link_connection(in_link);
     if (in_conn) {
-        qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
-        set_safe_ptr_qd_link_t(in_link, safe_ptr);
+        qd_link_t_sp *safe_ptr = NEW(qd_alloc_safe_ptr_t);
+        *safe_ptr = context;  // use original to keep old sequence counter
         qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr);
     }
 }
diff --git a/tests/message_test.c b/tests/message_test.c
index 7da011c..18a22b0 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -738,7 +738,7 @@
         // the buffers in 'field' are inserted into message 'msg'.
         qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
         qd_compose_insert_binary_buffers(field, &mule_content->buffers);
-        qd_message_extend(msg, field);
+        qd_message_extend(msg, field, 0);
 
         // Clean up temporary resources
         free(buf2);
@@ -979,13 +979,20 @@
     // snapshot the message buffer count to use as a baseline
     const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(msg)->buffers);
 
-    int depth = qd_message_stream_data_append(msg, &bin_data);
+    bool blocked;
+    int depth = qd_message_stream_data_append(msg, &bin_data, &blocked);
     if (depth <= body_bufct) {
         // expected to add extra buffer(s) for meta-data
         result = "append length is incorrect";
         goto exit;
     }
 
+    // expected that the append has triggered Q2 blocking:
+    if (!blocked) {
+        result = "expected Q2 block event did not occur!";
+        goto exit;
+    }
+
     // And while we're at it, stuff in a footer
     field = qd_compose(QD_PERFORMATIVE_FOOTER, 0);
     qd_compose_start_map(field);
@@ -994,7 +1001,7 @@
     qd_compose_insert_symbol(field, "Key2");
     qd_compose_insert_string(field, "Value2");
     qd_compose_end_map(field);
-    qd_message_extend(msg, field);
+    qd_message_extend(msg, field, 0);
     qd_compose_free(field);
 
     qd_message_set_receive_complete(msg);
@@ -1123,7 +1130,7 @@
     memset(buffer, '5', 1001);
     qd_compose_insert_binary(field, buffer, 5);
 
-    qd_message_extend(in_msg, field);
+    qd_message_extend(in_msg, field, 0);
     qd_compose_free(field);
 
     qd_message_set_receive_complete(in_msg);
@@ -1232,6 +1239,7 @@
     const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers);
 
     // Append a footer
+    bool q2_blocked;
     field = qd_compose(QD_PERFORMATIVE_FOOTER, 0);
     qd_compose_start_map(field);
     qd_compose_insert_symbol(field, "Key1");
@@ -1239,9 +1247,16 @@
     qd_compose_insert_symbol(field, "Key2");
     qd_compose_insert_string(field, "Value2");
     qd_compose_end_map(field);
-    qd_message_extend(in_msg, field);
+    qd_message_extend(in_msg, field, &q2_blocked);
     qd_compose_free(field);
 
+    // this small message should not have triggered Q2
+    assert(DEQ_SIZE(MSG_CONTENT(in_msg)->buffers) < QD_QLIMIT_Q2_UPPER);
+    if (q2_blocked) {
+        result = "Unexpected Q2 block on message extend";
+        goto exit;
+    }
+
     qd_message_set_receive_complete(in_msg);
 
     // "fan out" the message