DISPATCH-1960: Refactor Q2 flow control for protocol adaptor use.
This closes #1027
diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h
index e8722a5..d872ff4 100644
--- a/include/qpid/dispatch/alloc_pool.h
+++ b/include/qpid/dispatch/alloc_pool.h
@@ -80,7 +80,26 @@
/** De-allocate from a thread pool. Use via ALLOC_DECLARE */
void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p);
uint32_t qd_alloc_sequence(void *p);
-static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; }
+
+// generic safe pointer api for any alloc pool item
+
+#define QD_SAFE_PTR_INIT(p) { .ptr = (void*)(p), .seq = qd_alloc_sequence(p) }
+
+static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp)
+{
+ sp->ptr = 0;
+}
+
+static inline void qd_alloc_set_safe_ptr(qd_alloc_safe_ptr_t *sp, void *p)
+{
+ sp->ptr = p;
+ sp->seq = qd_alloc_sequence(p);
+}
+
+static inline void *qd_alloc_deref_safe_ptr(const qd_alloc_safe_ptr_t *sp)
+{
+ return sp->seq == qd_alloc_sequence(sp->ptr) ? sp->ptr : (void*) 0;
+}
/**
* Declare functions new_T and alloc_T
@@ -102,8 +121,8 @@
__thread qd_alloc_pool_t *__local_pool_##T = 0; \
T *new_##T(void) { return (T*) qd_alloc(&__desc_##T, &__local_pool_##T); } \
void free_##T(T *p) { qd_dealloc(&__desc_##T, &__local_pool_##T, (char*) p); } \
- void set_safe_ptr_##T(T *p, T##_sp *sp) { sp->ptr = (void*) p; sp->seq = qd_alloc_sequence((void*) p); } \
- T *safe_deref_##T(T##_sp sp) { return sp.seq == qd_alloc_sequence((void*) sp.ptr) ? (T*) sp.ptr : (T*) 0; } \
+ void set_safe_ptr_##T(T *p, T##_sp *sp) { qd_alloc_set_safe_ptr(sp, (void*)p); } \
+ T *safe_deref_##T(T##_sp sp) { return (T*) qd_alloc_deref_safe_ptr((qd_alloc_safe_ptr_t*) &(sp)); } \
qd_alloc_stats_t *alloc_stats_##T(void) { return __desc_##T.stats; } \
void *unused##T
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 08caba1..82d9ad8 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -228,7 +228,7 @@
void qd_link_detach(qd_link_t *link);
void qd_link_free(qd_link_t *link);
void *qd_link_get_node_context(const qd_link_t *link);
-void qd_link_restart_rx(qd_link_t *link);
+void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
void qd_link_q3_block(qd_link_t *link);
void qd_link_q3_unblock(qd_link_t *link);
uint64_t qd_link_link_id(const qd_link_t *link);
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 657477d..07e144a 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -253,10 +253,9 @@
* @param msg A pointer to a message to be sent.
* @param link The outgoing link on which to send the message.
* @param strip_outbound_annotations [in] annotation control flag
- * @param restart_rx [out] indication to wake up receive process
* @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full
*/
-void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx, bool *q3_stalled);
+void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *q3_stalled);
/**
* Check that the message is well-formed up to a certain depth. Any part of the message that is
@@ -304,9 +303,10 @@
*
* @param msg Pointer to a message
* @param field A composed field to be appended to the end of the message's stream
+ * @param q2_blocked Set to true if this call caused Q2 to block
* @return The number of buffers stored in the message's content
*/
-int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field);
+int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked);
/**
@@ -404,9 +404,10 @@
*
* @param msg Pointer to message under construction
* @param data List of buffers containing body data.
+ * @param qd_blocked Set to true if this call caused Q2 to block
* @return The number of buffers stored in the message's content
*/
-int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data);
+int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked);
/** Put string representation of a message suitable for logging in buffer.
@@ -556,12 +557,22 @@
*/
bool qd_message_is_Q2_blocked(const qd_message_t *msg);
+
/**
- * Return qd_link through which the message is being received.
- * @param msg A pointer to the message
- * @return the qd_link
+ * Register a callback that will be invoked when the message has exited the Q2
+ * blocking state. Note that the callback can be invoked on any I/O thread.
+ * The callback must be thread safe.
+ *
+ * @param msg The message to monitor.
+ * @param callback The method to invoke
+ * @param context safe pointer holding the context
*/
-qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg);
+
+typedef void (*qd_message_q2_unblocked_handler_t)(qd_alloc_safe_ptr_t context);
+void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
+ qd_message_q2_unblocked_handler_t callback,
+ qd_alloc_safe_ptr_t context);
+void qd_message_clear_q2_unblocked_handler(qd_message_t *msg);
/**
* Return message aborted state
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 1d17724..27b25cd 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -860,7 +860,8 @@
"[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
- qd_message_stream_data_append(msg, body);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_stream_data_append(msg, body, 0);
//
// Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 3f46e44..1ef6b2a 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -976,7 +976,8 @@
qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
- qd_message_stream_data_append(msg, body);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_stream_data_append(msg, body, 0);
//
// Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 2330ece..25347e4 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -465,7 +465,8 @@
qd_buffer_list_t existing_buffers;
DEQ_INIT(existing_buffers);
qd_compose_take_buffers(stream_data->body, &existing_buffers);
- qd_message_stream_data_append(stream_data->message, &existing_buffers);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_stream_data_append(stream_data->message, &existing_buffers, 0);
stream_data->body_data_added = true;
}
}
@@ -475,7 +476,8 @@
stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
stream_data->body_data_added = true;
}
- qd_message_stream_data_append(stream_data->message, &buffers);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_stream_data_append(stream_data->message, &buffers, 0);
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, stream_id);
}
else {
@@ -930,7 +932,8 @@
if (!stream_data->body) {
stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
qd_compose_insert_binary(stream_data->body, 0, 0);
- qd_message_extend(stream_data->message, stream_data->body);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_extend(stream_data->message, stream_data->body, 0);
}
}
@@ -965,7 +968,8 @@
if (stream_data->use_footer_properties) {
qd_compose_end_map(stream_data->footer_properties);
stream_data->entire_footer_arrived = true;
- qd_message_extend(stream_data->message, stream_data->footer_properties);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_extend(stream_data->message, stream_data->footer_properties, 0);
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Closing footer map, extending message with footer", conn->conn_id, stream_id);
}
else {
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 2fd3ff2..9982ad0 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -497,7 +497,8 @@
//
// Extend the streaming message and free the composed field
//
- depth = qd_message_extend(adaptor->streaming_message, field);
+ // TODO(kgiusti): need to handle Q2 blocking event
+ depth = qd_message_extend(adaptor->streaming_message, field, 0);
qd_compose_free(field);
}
@@ -519,7 +520,8 @@
qd_compose_insert_symbol(footer, "second");
qd_compose_insert_string(footer, "value2");
qd_compose_end_map(footer);
- depth = qd_message_extend(adaptor->streaming_message, footer);
+ // @TODO(kgiusti): need to handle Q2 blocking event
+ depth = qd_message_extend(adaptor->streaming_message, footer, 0);
qd_compose_free(footer);
qd_message_set_receive_complete(adaptor->streaming_message);
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index fce2219..6cc8855 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -183,7 +183,8 @@
grant_read_buffers(conn);
if (conn->instream) {
- qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers);
+ // @TODO(kgiusti): handle Q2 block event:
+ qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, 0);
qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
} else {
diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index 284630e..2ab6567 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -507,6 +507,10 @@
return 0;
qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
+#ifdef QD_MEMORY_DEBUG
+ // ensure p actually points to an alloc pool item
+ assert(item->header == PATTERN_FRONT);
+#endif
return item->sequence;
}
diff --git a/src/message.c b/src/message.c
index 3e86f54..f81d15e 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1020,7 +1020,8 @@
{
if (!in_msg) return;
uint32_t rc;
- qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_message_q2_unblocker_t q2_unblock = {0};
qd_buffer_list_free_buffers(&msg->ma_to_override);
qd_buffer_list_free_buffers(&msg->ma_trace);
@@ -1055,12 +1056,16 @@
&& was_blocked
&& qd_message_Q2_holdoff_should_unblock(in_msg)) {
content->q2_input_holdoff = false;
- qd_link_restart_rx(qd_message_get_receiving_link(in_msg));
+ q2_unblock = content->q2_unblocker;
}
UNLOCK(content->lock);
}
+ // the Q2 handler must be invoked outside the lock
+ if (q2_unblock.handler)
+ q2_unblock.handler(q2_unblock.context);
+
rc = sys_atomic_dec(&content->ref_count) - 1;
if (rc == 0) {
if (content->ma_field_iter_in)
@@ -1320,7 +1325,14 @@
{
if (!!in_msg) {
qd_message_content_t *content = MSG_CONTENT(in_msg);
+
+ LOCK(content->lock);
+
content->receive_complete = true;
+ content->q2_unblocker.handler = 0;
+ qd_nullify_safe_ptr(&content->q2_unblocker.context);
+
+ UNLOCK(content->lock);
}
}
@@ -1384,7 +1396,6 @@
} else if (rc == PN_EOS || rc < 0) {
// End of message or error: finalize message_receive handling
msg->content->aborted = pn_delivery_aborted(delivery);
- qd_nullify_safe_ptr(&msg->content->input_link_sp);
pn_record_t *record = pn_delivery_attachments(delivery);
pn_record_set(record, PN_DELIVERY_CTX, 0);
if (msg->content->oversize) {
@@ -1392,7 +1403,7 @@
// This has no effect on the received message.
msg->content->aborted = true;
}
- msg->content->receive_complete = true;
+ qd_message_set_receive_complete((qd_message_t*) msg);
break;
} else {
// rc was > 0. bytes were read and discarded.
@@ -1429,7 +1440,8 @@
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
qd_connection_t *qdc = qd_link_connection(qdl);
- set_safe_ptr_qd_link_t(qdl, &msg->content->input_link_sp);
+ qd_alloc_safe_ptr_t sp = QD_SAFE_PTR_INIT(qdl);
+ qd_message_set_q2_unblocked_handler((qd_message_t*) msg, qd_link_q2_restart_receive, sp);
msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc);
pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
@@ -1491,8 +1503,9 @@
}
content->receive_complete = true;
+ content->q2_unblocker.handler = 0;
+ qd_nullify_safe_ptr(&content->q2_unblocker.context);
content->aborted = pn_delivery_aborted(delivery);
- qd_nullify_safe_ptr(&content->input_link_sp);
// unlink message and delivery
pn_record_set(record, PN_DELIVERY_CTX, 0);
@@ -1726,7 +1739,6 @@
void qd_message_send(qd_message_t *in_msg,
qd_link_t *link,
bool strip_annotations,
- bool *restart_rx,
bool *q3_stalled)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
@@ -1734,7 +1746,6 @@
qd_buffer_t *buf = 0;
pn_link_t *pnl = qd_link_pn(link);
- *restart_rx = false;
*q3_stalled = false;
if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
@@ -1842,8 +1853,9 @@
buf = msg->cursor.buffer;
- pn_session_t *pns = pn_link_session(pnl);
- const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
+ qd_message_q2_unblocker_t q2_unblock = {0};
+ pn_session_t *pns = pn_link_session(pnl);
+ const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
while (!content->aborted
&& buf
@@ -1913,7 +1925,7 @@
// set input holdoff before the deferred handler
// runs.
content->q2_input_holdoff = false;
- *restart_rx = true;
+ q2_unblock = content->q2_unblocker;
}
}
} // end free buffer
@@ -1940,6 +1952,10 @@
UNLOCK(content->lock);
}
+ // the Q2 handler must be invoked outside the lock
+ if (q2_unblock.handler)
+ q2_unblock.handler(q2_unblock.context);
+
if (content->aborted) {
if (pn_link_current(pnl)) {
msg->send_complete = true;
@@ -2319,13 +2335,16 @@
}
-int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
+int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked)
{
qd_message_content_t *content = MSG_CONTENT(msg);
int count;
qd_buffer_list_t *buffers = qd_compose_buffers(field);
qd_buffer_t *buf = DEQ_HEAD(*buffers);
+ if (q2_blocked)
+ *q2_blocked = false;
+
LOCK(content->lock);
while (buf) {
qd_buffer_set_fanout(buf, content->fanout);
@@ -2334,6 +2353,14 @@
DEQ_APPEND(content->buffers, (*buffers));
count = DEQ_SIZE(content->buffers);
+
+ // buffers added - much check for Q2:
+ if (qd_message_Q2_holdoff_should_block(msg)) {
+ content->q2_input_holdoff = true;
+ if (q2_blocked)
+ *q2_blocked = true;
+ }
+
UNLOCK(content->lock);
return count;
}
@@ -2549,7 +2576,8 @@
LOCK(content->lock);
- bool was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt);
+ bool was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt);
+ qd_message_q2_unblocker_t q2_unblock = {0};
if (pvt->is_fanout) {
buf = start_buf;
@@ -2581,13 +2609,16 @@
&& was_blocked
&& qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt)) {
content->q2_input_holdoff = false;
- qd_link_restart_rx(qd_message_get_receiving_link((qd_message_t*) pvt));
+ q2_unblock = content->q2_unblocker;
}
UNLOCK(content->lock);
DEQ_REMOVE(pvt->stream_data_list, stream_data);
free_qd_message_stream_data_t(stream_data);
+
+ if (q2_unblock.handler)
+ q2_unblock.handler(q2_unblock.context);
}
@@ -2820,12 +2851,6 @@
}
-qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg)
-{
- return safe_deref_qd_link_t(((qd_message_pvt_t *)msg)->content->input_link_sp);
-}
-
-
bool qd_message_aborted(const qd_message_t *msg)
{
return ((qd_message_pvt_t *)msg)->content->aborted;
@@ -2847,12 +2872,15 @@
}
-int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data)
+int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked)
{
unsigned int length = DEQ_SIZE(*data);
qd_composed_field_t *field = 0;
int rc = 0;
+ if (q2_blocked)
+ *q2_blocked = false;
+
if (length == 0)
return rc;
@@ -2887,7 +2915,35 @@
field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
qd_compose_insert_binary_buffers(field, data);
- rc = qd_message_extend(message, field);
+ rc = qd_message_extend(message, field, q2_blocked);
qd_compose_free(field);
return rc;
}
+
+
+void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
+ qd_message_q2_unblocked_handler_t callback,
+ qd_alloc_safe_ptr_t context)
+{
+ qd_message_content_t *content = MSG_CONTENT(msg);
+
+ LOCK(content->lock);
+
+ content->q2_unblocker.handler = callback;
+ content->q2_unblocker.context = context;
+
+ UNLOCK(content->lock);
+}
+
+
+void qd_message_clear_Q2_unblocked_handler(qd_message_t *msg)
+{
+ qd_message_content_t *content = MSG_CONTENT(msg);
+
+ LOCK(content->lock);
+
+ content->q2_unblocker.handler = 0;
+ qd_nullify_safe_ptr(&content->q2_unblocker.context);
+
+ UNLOCK(content->lock);
+}
diff --git a/src/message_private.h b/src/message_private.h
index a8067c7..c4262a9 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -73,6 +73,13 @@
ALLOC_DECLARE(qd_message_stream_data_t);
DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t);
+
+typedef struct {
+ qd_message_q2_unblocked_handler_t handler;
+ qd_alloc_safe_ptr_t context;
+} qd_message_q2_unblocker_t;
+
+
// TODO - consider using pointers to qd_field_location_t below to save memory
// TODO - provide a way to allocate a message without a lock for the link-routing case.
// It's likely that link-routing will cause no contention for the message content.
@@ -126,7 +133,8 @@
uint64_t max_message_size; // configured max; 0 if no max to enforce
uint64_t bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size
uint32_t fanout; // The number of receivers for this message, including in-process subscribers.
- qd_link_t_sp input_link_sp; // message received on this link
+
+ qd_message_q2_unblocker_t q2_unblocker; // callback and context to signal Q2 unblocked to receiver
bool ma_parsed; // have parsed annotations in incoming message
bool discard; // Should this message be discarded?
diff --git a/src/router_node.c b/src/router_node.c
index 7dbdb0d..70d98f9 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -855,6 +855,7 @@
if (!discard) {
qd_link_t *qdl = safe_deref_qd_link_t(*safe_qdl);
if (!!qdl) {
+ assert(qd_link_direction(qdl) == QD_INCOMING);
qd_router_t *qdr = (qd_router_t*) qd_link_get_node_context(qdl);
assert(qdr != 0);
while (true) {
@@ -1914,22 +1915,17 @@
if (!pdlv)
return 0;
- bool restart_rx = false;
bool q3_stalled = false;
qd_message_t *msg_out = qdr_delivery_message(dlv);
- qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx, &q3_stalled);
+ qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &q3_stalled);
if (q3_stalled) {
qd_link_q3_block(qlink);
qdr_link_stalled_outbound(link);
}
- if (restart_rx) {
- qd_link_restart_rx(qd_message_get_receiving_link(msg_out));
- }
-
bool send_complete = qdr_delivery_send_complete(dlv);
if (send_complete) {
@@ -2059,7 +2055,10 @@
// and if it is blocked by Q2 holdoff, get the link rolling again.
//
qd_message_Q2_holdoff_disable(msg);
- qd_link_restart_rx(link);
+
+ qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
+ set_safe_ptr_qd_link_t(link, safe_ptr);
+ qd_connection_invoke_deferred(qd_conn, deferred_AMQP_rx_handler, safe_ptr);
}
}
}
@@ -2131,10 +2130,13 @@
}
-// called when Q2 holdoff is deactivated so we can receive more message buffers
+// invoked by an I/O thread when enough buffers have been released deactivate
+// the Q2 block. Note that this method will likely be running on a worker
+// thread that is not the same thread that "owns" the qd_link_t passed in.
//
-void qd_link_restart_rx(qd_link_t *in_link)
+void qd_link_q2_restart_receive(qd_alloc_safe_ptr_t context)
{
+ qd_link_t *in_link = (qd_link_t*) qd_alloc_deref_safe_ptr(&context);
if (!in_link)
return;
@@ -2142,8 +2144,8 @@
qd_connection_t *in_conn = qd_link_connection(in_link);
if (in_conn) {
- qd_link_t_sp *safe_ptr = NEW(qd_link_t_sp);
- set_safe_ptr_qd_link_t(in_link, safe_ptr);
+ qd_link_t_sp *safe_ptr = NEW(qd_alloc_safe_ptr_t);
+ *safe_ptr = context; // use original to keep old sequence counter
qd_connection_invoke_deferred(in_conn, deferred_AMQP_rx_handler, safe_ptr);
}
}
diff --git a/tests/message_test.c b/tests/message_test.c
index 7da011c..18a22b0 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -738,7 +738,7 @@
// the buffers in 'field' are inserted into message 'msg'.
qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
qd_compose_insert_binary_buffers(field, &mule_content->buffers);
- qd_message_extend(msg, field);
+ qd_message_extend(msg, field, 0);
// Clean up temporary resources
free(buf2);
@@ -979,13 +979,20 @@
// snapshot the message buffer count to use as a baseline
const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(msg)->buffers);
- int depth = qd_message_stream_data_append(msg, &bin_data);
+ bool blocked;
+ int depth = qd_message_stream_data_append(msg, &bin_data, &blocked);
if (depth <= body_bufct) {
// expected to add extra buffer(s) for meta-data
result = "append length is incorrect";
goto exit;
}
+ // expected that the append has triggered Q2 blocking:
+ if (!blocked) {
+ result = "expected Q2 block event did not occur!";
+ goto exit;
+ }
+
// And while we're at it, stuff in a footer
field = qd_compose(QD_PERFORMATIVE_FOOTER, 0);
qd_compose_start_map(field);
@@ -994,7 +1001,7 @@
qd_compose_insert_symbol(field, "Key2");
qd_compose_insert_string(field, "Value2");
qd_compose_end_map(field);
- qd_message_extend(msg, field);
+ qd_message_extend(msg, field, 0);
qd_compose_free(field);
qd_message_set_receive_complete(msg);
@@ -1123,7 +1130,7 @@
memset(buffer, '5', 1001);
qd_compose_insert_binary(field, buffer, 5);
- qd_message_extend(in_msg, field);
+ qd_message_extend(in_msg, field, 0);
qd_compose_free(field);
qd_message_set_receive_complete(in_msg);
@@ -1232,6 +1239,7 @@
const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers);
// Append a footer
+ bool q2_blocked;
field = qd_compose(QD_PERFORMATIVE_FOOTER, 0);
qd_compose_start_map(field);
qd_compose_insert_symbol(field, "Key1");
@@ -1239,9 +1247,16 @@
qd_compose_insert_symbol(field, "Key2");
qd_compose_insert_string(field, "Value2");
qd_compose_end_map(field);
- qd_message_extend(in_msg, field);
+ qd_message_extend(in_msg, field, &q2_blocked);
qd_compose_free(field);
+ // this small message should not have triggered Q2
+ assert(DEQ_SIZE(MSG_CONTENT(in_msg)->buffers) < QD_QLIMIT_Q2_UPPER);
+ if (q2_blocked) {
+ result = "Unexpected Q2 block on message extend";
+ goto exit;
+ }
+
qd_message_set_receive_complete(in_msg);
// "fan out" the message