blob: 8f810b49f50649d994fe4ed1efda7bc6d9cf5c03 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qpid/dispatch/message.h"
#include "aprintf.h"
#include "compose_private.h"
#include "connection_manager_private.h"
#include "message_private.h"
#include "policy.h"
#include "buffer_field_api.h"
#include "qpid/dispatch/amqp.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/error.h"
#include "qpid/dispatch/iterator.h"
#include "qpid/dispatch/log.h"
#include "qpid/dispatch/threading.h"
#include <proton/object.h>
#include <assert.h>
#include <ctype.h>
#include <inttypes.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#define LOCK sys_mutex_lock
#define UNLOCK sys_mutex_unlock
const char *STR_AMQP_NULL = "null";
const char *STR_AMQP_TRUE = "T";
const char *STR_AMQP_FALSE = "F";
static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0"
"\xa1\xb1\xa3\xb3\xe0\xf0"
"\x40\x56\x41\x42\x50\x60\x70\x52\x43\x80\x53\x44\x51\x61\x71\x54\x81\x55\x72\x82\x74\x84\x94\x73\x83\x98";
static const char * const section_names[QD_DEPTH_ALL + 1] = {
[QD_DEPTH_NONE] = "none",
[QD_DEPTH_HEADER] = "header",
[QD_DEPTH_DELIVERY_ANNOTATIONS] = "delivery annotations",
[QD_DEPTH_MESSAGE_ANNOTATIONS] = "message annotations",
[QD_DEPTH_PROPERTIES] = "properties",
[QD_DEPTH_APPLICATION_PROPERTIES] = "application properties",
[QD_DEPTH_BODY] = "body",
[QD_DEPTH_ALL] = "footer"
};
PN_HANDLE(PN_DELIVERY_CTX)
ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
ALLOC_DEFINE(qd_message_content_t);
ALLOC_DEFINE(qd_message_stream_data_t);
typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
qd_log_source_t* log_source = 0;
qd_log_source_t* qd_message_log_source()
{
if(log_source)
return log_source;
else {
qd_message_initialize();
return log_source;
}
}
void qd_message_initialize() {
log_source = qd_log_source("MESSAGE");
}
int qd_message_repr_len() { return qd_log_max_len(); }
/**
* Quote non-printable characters suitable for log messages. Output in buffer.
*/
static void quote(char* bytes, int n, char **begin, char *end) {
for (char* p = bytes; p < bytes+n; ++p) {
if (isprint(*p) || isspace(*p))
aprintf(begin, end, "%c", (int)*p);
else
aprintf(begin, end, "\\%02hhx", *p);
}
}
/**
* Populates the buffer with formatted epoch_time
*/
static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len)
{
struct timeval local_timeval;
local_timeval.tv_sec = epoch_time/1000;
local_timeval.tv_usec = (epoch_time%1000) * 1000;
time_t local_time_t;
local_time_t = local_timeval.tv_sec;
struct tm *local_tm;
char fmt[100];
local_tm = localtime(&local_time_t);
if (local_tm != NULL) {
strftime(fmt, sizeof fmt, format, local_tm);
snprintf(buffer, len, fmt, local_timeval.tv_usec / 1000);
}
}
/**
* Print the bytes of a parsed_field as characters, with pre/post quotes.
*/
static void print_parsed_field_string(qd_parsed_field_t *parsed_field,
const char *pre, const char *post,
char **begin, char *end) {
qd_iterator_t *i = qd_parse_raw(parsed_field);
if (i) {
aprintf(begin, end, "%s", pre);
while (end - *begin > 1 && !qd_iterator_end(i)) {
char c = qd_iterator_octet(i);
quote(&c, 1, begin, end);
}
aprintf(begin, end, "%s", post);
}
}
/**
* Tries to print the string representation of the parsed field content based on
* the tag of the parsed field. Some tag types have not been dealt with. Add
* code as and when required.
*/
static void print_parsed_field(qd_parsed_field_t *parsed_field, char **begin, char *end)
{
uint8_t tag = qd_parse_tag(parsed_field);
switch (tag) {
case QD_AMQP_NULL:
aprintf(begin, end, "%s", STR_AMQP_NULL);
break;
case QD_AMQP_BOOLEAN:
case QD_AMQP_TRUE:
case QD_AMQP_FALSE:
aprintf(begin, end, "%s", qd_parse_as_uint(parsed_field) ? STR_AMQP_TRUE: STR_AMQP_FALSE);
break;
case QD_AMQP_BYTE:
case QD_AMQP_SHORT:
case QD_AMQP_INT:
case QD_AMQP_SMALLINT: {
char str[11];
int32_t int32_val = qd_parse_as_int(parsed_field);
snprintf(str, 10, "%"PRId32"", int32_val);
aprintf(begin, end, "%s", str);
break;
}
case QD_AMQP_UBYTE:
case QD_AMQP_USHORT:
case QD_AMQP_UINT:
case QD_AMQP_SMALLUINT:
case QD_AMQP_UINT0: {
char str[11];
uint32_t uint32_val = qd_parse_as_uint(parsed_field);
snprintf(str, 11, "%"PRIu32"", uint32_val);
aprintf(begin, end, "%s", str);
break;
}
case QD_AMQP_ULONG:
case QD_AMQP_SMALLULONG:
case QD_AMQP_ULONG0: {
char str[21];
uint64_t uint64_val = qd_parse_as_ulong(parsed_field);
snprintf(str, 20, "%"PRIu64"", uint64_val);
aprintf(begin, end, "%s", str);
break;
}
case QD_AMQP_TIMESTAMP: {
char creation_time[100]; //string representation of creation time.
pn_timestamp_t creation_timestamp = qd_parse_as_ulong(parsed_field);
if (creation_timestamp > 0) {
format_time(creation_timestamp, "%Y-%m-%d %H:%M:%S.%%03lu %z", creation_time, 100);
aprintf(begin, end, "\"%s\"", creation_time);
}
break;
}
case QD_AMQP_LONG:
case QD_AMQP_SMALLLONG: {
char str[21];
int64_t int64_val = qd_parse_as_long(parsed_field);
snprintf(str, 20, "%"PRId64"", int64_val);
aprintf(begin, end, "%s", str);
break;
}
case QD_AMQP_FLOAT:
case QD_AMQP_DOUBLE:
case QD_AMQP_DECIMAL32:
case QD_AMQP_DECIMAL64:
case QD_AMQP_DECIMAL128:
case QD_AMQP_UTF32:
case QD_AMQP_UUID:
break; //TODO
case QD_AMQP_VBIN8:
case QD_AMQP_VBIN32:
print_parsed_field_string(parsed_field, "b\"", "\"", begin, end);
break;
case QD_AMQP_STR8_UTF8:
case QD_AMQP_STR32_UTF8:
print_parsed_field_string(parsed_field, "\"", "\"", begin, end);
break;
case QD_AMQP_SYM8:
case QD_AMQP_SYM32:
print_parsed_field_string(parsed_field, ":\"", "\"", begin, end);
break;
case QD_AMQP_MAP8:
case QD_AMQP_MAP32: {
uint32_t count = qd_parse_sub_count(parsed_field);
if (count > 0) {
aprintf(begin, end, "%s", "{");
}
for (uint32_t idx = 0; idx < count; idx++) {
qd_parsed_field_t *sub_key = qd_parse_sub_key(parsed_field, idx);
// The keys of this map are restricted to be of type string
// (which excludes the possibility of a null key)
print_parsed_field(sub_key, begin, end);
aprintf(begin, end, "%s", "=");
qd_parsed_field_t *sub_value = qd_parse_sub_value(parsed_field, idx);
print_parsed_field(sub_value, begin, end);
if ((idx + 1) < count)
aprintf(begin, end, "%s", ", ");
}
if (count > 0) {
aprintf(begin, end, "%s", "}");
}
break;
}
case QD_AMQP_LIST0:
case QD_AMQP_LIST8:
case QD_AMQP_LIST32: {
uint32_t count = qd_parse_sub_count(parsed_field);
if (count > 0) {
aprintf(begin, end, "%s", "[");
}
for (uint32_t idx = 0; idx < count; idx++) {
qd_parsed_field_t *sub_value = qd_parse_sub_value(parsed_field, idx);
print_parsed_field(sub_value, begin, end);
if ((idx + 1) < count)
aprintf(begin, end, "%s", ", ");
}
if (count > 0) {
aprintf(begin, end, "%s", "]");
}
break;
}
default:
break;
}
}
/* Print field if enabled by log bits, leading comma if !*first */
static void print_field(
qd_message_t *msg, int field, const char *name,
qd_log_bits flags, bool *first, char **begin, char *end)
{
if (is_log_component_enabled(flags, name)) {
qd_iterator_t* iter = (field == QD_FIELD_APPLICATION_PROPERTIES) ?
qd_message_field_iterator(msg, field) :
qd_message_field_iterator_typed(msg, field);
if (iter) {
qd_parsed_field_t *parsed_field = qd_parse(iter);
if (qd_parse_ok(parsed_field)) {
if (*first) {
*first = false;
aprintf(begin, end, "%s=", name);
} else {
aprintf(begin, end, ", %s=", name);
}
print_parsed_field(parsed_field, begin, end);
}
qd_parse_free(parsed_field);
qd_iterator_free(iter);
}
}
}
static const char REPR_END[] = "}\0";
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits flags) {
if (flags == 0
|| qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) != QD_MESSAGE_DEPTH_OK
|| !((qd_message_pvt_t *)msg)->content->section_application_properties.parsed) {
return NULL;
}
char *begin = buffer;
char *end = buffer + len - sizeof(REPR_END); /* Save space for ending */
bool first = true;
aprintf(&begin, end, "Message{");
print_field(msg, QD_FIELD_MESSAGE_ID, "message-id", flags, &first, &begin, end);
print_field(msg, QD_FIELD_USER_ID, "user-id", flags, &first, &begin, end);
print_field(msg, QD_FIELD_TO, "to", flags, &first, &begin, end);
print_field(msg, QD_FIELD_SUBJECT, "subject", flags, &first, &begin, end);
print_field(msg, QD_FIELD_REPLY_TO, "reply-to", flags, &first, &begin, end);
print_field(msg, QD_FIELD_CORRELATION_ID, "correlation-id", flags, &first, &begin, end);
print_field(msg, QD_FIELD_CONTENT_TYPE, "content-type", flags, &first, &begin, end);
print_field(msg, QD_FIELD_CONTENT_ENCODING, "content-encoding", flags, &first, &begin, end);
print_field(msg, QD_FIELD_ABSOLUTE_EXPIRY_TIME, "absolute-expiry-time", flags, &first, &begin, end);
print_field(msg, QD_FIELD_CREATION_TIME, "creation-time", flags, &first, &begin, end);
print_field(msg, QD_FIELD_GROUP_ID, "group-id", flags, &first, &begin, end);
print_field(msg, QD_FIELD_GROUP_SEQUENCE, "group-sequence", flags, &first, &begin, end);
print_field(msg, QD_FIELD_REPLY_TO_GROUP_ID, "reply-to-group-id", flags, &first, &begin, end);
print_field(msg, QD_FIELD_APPLICATION_PROPERTIES, "app-properties", flags, &first, &begin, end);
aprintf(&begin, end, "%s", REPR_END); /* We saved space at the beginning. */
return buffer;
}
/**
* Return true if there is at least one consumable octet in the buffer chain
* starting at *cursor. If the cursor is beyond the end of the buffer, and there
* is another buffer in the chain, move the cursor and buffer pointers to reference
* the first octet in the next buffer. Note that this movement does NOT constitute
* advancement of the cursor in the buffer chain.
*/
static bool can_advance(unsigned char **cursor, qd_buffer_t **buffer)
{
if (qd_buffer_cursor(*buffer) > *cursor)
return true;
if (DEQ_NEXT(*buffer)) {
*buffer = DEQ_NEXT(*buffer);
*cursor = qd_buffer_base(*buffer);
}
return qd_buffer_cursor(*buffer) > *cursor;
}
/**
* Advance cursor through buffer chain by 'consume' bytes.
* Cursor and buffer args are advanced to point to new position in buffer chain.
* - if the number of bytes in the buffer chain is less than or equal to
* the consume number then return false
* - the original buffer chain is not changed or freed.
*
* @param cursor Pointer into current buffer content
* @param buffer pointer to current buffer
* @param consume number of bytes to advance
* @return true if all bytes consumed, false if not enough bytes available
*/
static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
{
if (!can_advance(cursor, buffer))
return false;
unsigned char *local_cursor = *cursor;
qd_buffer_t *local_buffer = *buffer;
int remaining = qd_buffer_cursor(local_buffer) - local_cursor;
while (consume > 0) {
if (consume <= remaining) {
local_cursor += consume;
consume = 0;
} else {
if (!local_buffer->next)
return false;
consume -= remaining;
local_buffer = local_buffer->next;
local_cursor = qd_buffer_base(local_buffer);
remaining = qd_buffer_size(local_buffer);
}
}
*cursor = local_cursor;
*buffer = local_buffer;
return true;
}
/**
* Advance cursor through buffer chain by 'consume' bytes.
* Cursor and buffer args are advanced to point to new position in buffer chain.
* Buffer content that is consumed is optionally passed to handler.
* - if the number of bytes in the buffer chain is less than or equal to
* the consume number then return the last buffer in the chain
* and a cursor pointing to the first unused byte in the buffer.
* - if the number of bytes in the buffer chain is greater than the consume
* number the returned buffer/cursor will point to the next available
* octet of data.
* - the original buffer chain is not changed or freed.
*
* @param cursor pointer into current buffer content
* @param buffer pointer to current buffer
* @param consume number of bytes to advance
* @param handler pointer to processor function
* @param context opaque argument for handler
*/
static void advance_guarded(const uint8_t **cursor, qd_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
{
const uint8_t *local_cursor = *cursor;
qd_buffer_t *local_buffer = *buffer;
int remaining = qd_buffer_cursor(local_buffer) - local_cursor;
while (consume > 0) {
if (consume < remaining) {
if (handler)
handler(context, local_cursor, consume);
local_cursor += consume;
consume = 0;
} else {
if (handler)
handler(context, local_cursor, remaining);
consume -= remaining;
if (!DEQ_NEXT(local_buffer)) {
local_cursor = qd_buffer_cursor(local_buffer);
break;
}
local_buffer = DEQ_NEXT(local_buffer);
local_cursor = qd_buffer_base(local_buffer);
remaining = qd_buffer_size(local_buffer);
}
}
*cursor = local_cursor;
*buffer = local_buffer;
}
/**
* If there is an octet to be consumed, put it in octet and return true, else return false.
*/
static bool next_octet(unsigned char **cursor, qd_buffer_t **buffer, unsigned char *octet)
{
if (can_advance(cursor, buffer)) {
*octet = **cursor;
advance(cursor, buffer, 1);
return true;
}
return false;
}
static bool traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
{
qd_buffer_t *start_buffer = *buffer;
unsigned char *start_cursor = *cursor;
unsigned char tag;
unsigned char octet;
if (!next_octet(cursor, buffer, &tag))
return false;
int consume = 0;
size_t hdr_length = 1;
switch (tag & 0xF0) {
case 0x40 :
consume = 0;
break;
case 0x50 :
consume = 1;
break;
case 0x60 :
consume = 2;
break;
case 0x70 :
consume = 4;
break;
case 0x80 :
consume = 8;
break;
case 0x90 :
consume = 16;
break;
case 0xB0 :
case 0xD0 :
case 0xF0 :
hdr_length += 3;
if (!next_octet(cursor, buffer, &octet))
return false;
consume |= ((int) octet) << 24;
if (!next_octet(cursor, buffer, &octet))
return false;
consume |= ((int) octet) << 16;
if (!next_octet(cursor, buffer, &octet))
return false;
consume |= ((int) octet) << 8;
// Fall through to the next case...
case 0xA0 :
case 0xC0 :
case 0xE0 :
hdr_length++;
if (!next_octet(cursor, buffer, &octet))
return false;
consume |= (int) octet;
break;
}
if (!advance(cursor, buffer, consume))
return false;
if (field && !field->parsed) {
field->buffer = start_buffer;
field->offset = start_cursor - qd_buffer_base(start_buffer);
field->length = consume;
field->hdr_length = hdr_length;
field->parsed = true;
field->tag = tag;
}
return true;
}
static int get_list_count(unsigned char **cursor, qd_buffer_t **buffer)
{
unsigned char tag;
unsigned char octet;
if (!next_octet(cursor, buffer, &tag))
return 0;
int count = 0;
switch (tag) {
case 0x45 : // list0
break;
case 0xd0 : // list32
//
// Advance past the list length
//
if (!advance(cursor, buffer, 4))
return 0;
if (!next_octet(cursor, buffer, &octet))
return 0;
count |= ((int) octet) << 24;
if (!next_octet(cursor, buffer, &octet))
return 0;
count |= ((int) octet) << 16;
if (!next_octet(cursor, buffer, &octet))
return 0;
count |= ((int) octet) << 8;
if (!next_octet(cursor, buffer, &octet))
return 0;
count |= (int) octet;
break;
case 0xc0 : // list8
//
// Advance past the list length
//
if (!advance(cursor, buffer, 1))
return 0;
if (!next_octet(cursor, buffer, &octet))
return 0;
count |= (int) octet;
break;
}
return count;
}
// Validate a message section (header, body, etc). This determines whether or
// not a given section is present and complete at the start of the buffer chain.
//
// The section is identified by a 'pattern' (a descriptor identifier, such as
// "MESSAGE_ANNOTATION_LONG" above). The descriptor also provides a type
// 'tag', which MUST match else the section is invalid.
//
// Non-Body message sections are optional. So if the pattern does NOT match
// then the section that the pattern represents is not present. Whether or not
// this is acceptable is left to the caller.
//
// If the pattern and tag match, extract the length and verify that the entire
// section is present in the buffer chain. If this is the case then store the
// start of the section in 'location' and advance '*buffer' and '*cursor' to
// the next section.
//
// if there is not enough of the section present in the buffer chain we need to
// wait until more data arrives and try again.
//
//
typedef enum {
QD_SECTION_INVALID, // invalid section (tag mismatch, duplicate section, etc).
QD_SECTION_MATCH,
QD_SECTION_NO_MATCH,
QD_SECTION_NEED_MORE // not enough data in the buffer chain - try again
} qd_section_status_t;
static qd_section_status_t message_section_check_LH(qd_message_content_t *content,
qd_buffer_t **buffer,
unsigned char **cursor,
const unsigned char *pattern,
int pattern_length,
const unsigned char *expected_tags,
qd_field_location_t *location,
bool dup_ok,
bool protect_buffer)
{
if (!*cursor || !can_advance(cursor, buffer))
return QD_SECTION_NEED_MORE;
qd_buffer_t *test_buffer = *buffer;
unsigned char *test_cursor = *cursor;
unsigned char *end_of_buffer = qd_buffer_cursor(test_buffer);
int idx = 0;
while (idx < pattern_length && *test_cursor == pattern[idx]) {
idx++;
test_cursor++;
if (test_cursor == end_of_buffer) {
test_buffer = test_buffer->next;
if (test_buffer == 0)
return QD_SECTION_NEED_MORE;
test_cursor = qd_buffer_base(test_buffer);
end_of_buffer = test_cursor + qd_buffer_size(test_buffer);
}
}
if (idx < pattern_length)
return QD_SECTION_NO_MATCH;
//
// Pattern matched, check the tag
//
while (*expected_tags && *test_cursor != *expected_tags)
expected_tags++;
if (*expected_tags == 0)
return QD_SECTION_INVALID; // Error: Unexpected tag
if (location->parsed && !dup_ok)
return QD_SECTION_INVALID; // Error: Duplicate section
//
// Pattern matched and tag is expected. Mark the beginning of the section.
//
location->buffer = *buffer;
location->offset = *cursor - qd_buffer_base(*buffer);
location->length = 0;
location->hdr_length = pattern_length;
//
// Check that the full section is present, if so advance the pointers to
// consume the whole section.
//
int pre_consume = 1; // Count the already extracted tag
uint32_t consume = 0;
unsigned char tag;
unsigned char octet;
if (!next_octet(&test_cursor, &test_buffer, &tag))
return QD_SECTION_NEED_MORE;
unsigned char tag_subcat = tag & 0xF0;
// if there is no more data the only valid data type is a null type (0x40),
// size is implied as 0
if (!can_advance(&test_cursor, &test_buffer) && tag_subcat != 0x40)
return QD_SECTION_NEED_MORE;
switch (tag_subcat) {
// fixed sizes:
case 0x40: /* null */ break;
case 0x50: consume = 1; break;
case 0x60: consume = 2; break;
case 0x70: consume = 4; break;
case 0x80: consume = 8; break;
case 0x90: consume = 16; break;
case 0xB0:
case 0xD0:
case 0xF0:
// uint32_t size field:
pre_consume += 3;
if (!next_octet(&test_cursor, &test_buffer, &octet))
return QD_SECTION_NEED_MORE;
consume |= ((uint32_t) octet) << 24;
if (!next_octet(&test_cursor, &test_buffer, &octet))
return QD_SECTION_NEED_MORE;
consume |= ((uint32_t) octet) << 16;
if (!next_octet(&test_cursor, &test_buffer, &octet))
return QD_SECTION_NEED_MORE;
consume |= ((uint32_t) octet) << 8;
// Fall through to the next case...
case 0xA0:
case 0xC0:
case 0xE0:
// uint8_t size field
pre_consume += 1;
if (!next_octet(&test_cursor, &test_buffer, &octet))
return QD_SECTION_NEED_MORE;
consume |= (uint32_t) octet;
break;
}
location->length = pre_consume + consume;
if (consume) {
if (!advance(&test_cursor, &test_buffer, consume)) {
return QD_SECTION_NEED_MORE; // whole section not fully received
}
}
if (protect_buffer) {
//
// increment the reference count of the parsed section as location now
// references it. Note that the cursor may have advanced to the octet after
// the parsed section, so be careful not to include an extra buffer past
// the end. And cursor + buffer will be null if the parsed section ends at
// the end of the buffer chain, so be careful of that, too!
//
bool buffers_protected = false;
qd_buffer_t *start = *buffer;
qd_buffer_t *last = test_buffer;
if (last && last != start) {
if (test_cursor == qd_buffer_base(last)) {
// last does not include octets for the current section
last = DEQ_PREV(last);
}
}
while (start) {
qd_buffer_inc_fanout(start);
buffers_protected = true;
if (start == last)
break;
start = DEQ_NEXT(start);
}
// DISPATCH-2191: protected buffers are never released - even after
// being sent - because they are referenced by the content->section_xxx
// location fields and remain valid for the life of the content
// instance. Since these buffers are never freed they must not be
// included in the Q2 threshold check!
if (buffers_protected) {
content->protected_buffers = 0;
start = DEQ_HEAD(content->buffers);
while (start) {
++content->protected_buffers;
if (start == last)
break;
start = DEQ_NEXT(start);
}
}
}
location->parsed = 1;
*cursor = test_cursor;
*buffer = test_buffer;
return QD_SECTION_MATCH;
}
// translate a field into its proper section of the message
static qd_message_field_t qd_field_section(qd_message_field_t field)
{
switch (field) {
case QD_FIELD_HEADER:
case QD_FIELD_DELIVERY_ANNOTATION:
case QD_FIELD_MESSAGE_ANNOTATION:
case QD_FIELD_PROPERTIES:
case QD_FIELD_APPLICATION_PROPERTIES:
case QD_FIELD_BODY:
case QD_FIELD_FOOTER:
return field;
case QD_FIELD_DURABLE:
case QD_FIELD_PRIORITY:
case QD_FIELD_TTL:
case QD_FIELD_FIRST_ACQUIRER:
case QD_FIELD_DELIVERY_COUNT:
return QD_FIELD_HEADER;
case QD_FIELD_MESSAGE_ID:
case QD_FIELD_USER_ID:
case QD_FIELD_TO:
case QD_FIELD_SUBJECT:
case QD_FIELD_REPLY_TO:
case QD_FIELD_CORRELATION_ID:
case QD_FIELD_CONTENT_TYPE:
case QD_FIELD_CONTENT_ENCODING:
case QD_FIELD_ABSOLUTE_EXPIRY_TIME:
case QD_FIELD_CREATION_TIME:
case QD_FIELD_GROUP_ID:
case QD_FIELD_GROUP_SEQUENCE:
case QD_FIELD_REPLY_TO_GROUP_ID:
return QD_FIELD_PROPERTIES;
default:
assert(false); // TBD: add new fields here
return QD_FIELD_NONE;
}
}
// get the field location of a field in the message properties (if it exists,
// else 0).
static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_message_field_t field)
{
static const intptr_t offsets[] = {
// position of the field's qd_field_location_t in the message content
// object
(intptr_t) &((qd_message_content_t*) 0)->field_message_id,
(intptr_t) &((qd_message_content_t*) 0)->field_user_id,
(intptr_t) &((qd_message_content_t*) 0)->field_to,
(intptr_t) &((qd_message_content_t*) 0)->field_subject,
(intptr_t) &((qd_message_content_t*) 0)->field_reply_to,
(intptr_t) &((qd_message_content_t*) 0)->field_correlation_id,
(intptr_t) &((qd_message_content_t*) 0)->field_content_type,
(intptr_t) &((qd_message_content_t*) 0)->field_content_encoding,
(intptr_t) &((qd_message_content_t*) 0)->field_absolute_expiry_time,
(intptr_t) &((qd_message_content_t*) 0)->field_creation_time,
(intptr_t) &((qd_message_content_t*) 0)->field_group_id,
(intptr_t) &((qd_message_content_t*) 0)->field_group_sequence,
(intptr_t) &((qd_message_content_t*) 0)->field_reply_to_group_id
};
// update table above if new fields need to be accessed:
assert(QD_FIELD_MESSAGE_ID <= field && field <= QD_FIELD_REPLY_TO_GROUP_ID);
qd_message_content_t *content = MSG_CONTENT(msg);
if (!content->section_message_properties.parsed) {
if (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) != QD_MESSAGE_DEPTH_OK || !content->section_message_properties.parsed)
return 0;
}
const int index = field - QD_FIELD_MESSAGE_ID;
qd_field_location_t *const location = (qd_field_location_t*) ((char*) content + offsets[index]);
if (location->parsed)
return location;
// requested field not parsed out. Need to parse out up to the requested field:
qd_buffer_t *buffer = content->section_message_properties.buffer;
unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
if (!advance(&cursor, &buffer, content->section_message_properties.hdr_length))
return 0;
if (index >= get_list_count(&cursor, &buffer))
return 0; // properties list too short
int position = 0;
while (position < index) {
qd_field_location_t *f = (qd_field_location_t*) ((char*) content + offsets[position]);
if (f->parsed) {
if (!advance(&cursor, &buffer, f->hdr_length + f->length))
return 0;
} else // parse it out
if (!traverse_field(&cursor, &buffer, f))
return 0;
position++;
}
// all fields previous to the target have now been parsed and cursor/buffer
// are in the correct position, parse out the field:
if (traverse_field(&cursor, &buffer, location))
return location;
return 0;
}
static void qd_message_parse_priority(qd_message_t *in_msg)
{
qd_message_content_t *content = MSG_CONTENT(in_msg);
qd_iterator_t *iter = qd_message_field_iterator(in_msg, QD_FIELD_HEADER);
SET_ATOMIC_FLAG(&content->priority_parsed);
if (!!iter) {
qd_parsed_field_t *field = qd_parse(iter);
if (qd_parse_ok(field)) {
if (qd_parse_is_list(field) && qd_parse_sub_count(field) >= 2) {
qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 1);
if (qd_parse_tag(priority_field) != QD_AMQP_NULL) {
uint32_t value = qd_parse_as_uint(priority_field);
value = MIN(value, QDR_MAX_PRIORITY);
sys_atomic_set(&content->priority, value);
}
}
}
qd_parse_free(field);
qd_iterator_free(iter);
}
}
// Get the field's location in the buffer. Return 0 if the field does not exist.
// Note that even if the field location is returned, it may contain a
// QD_AMQP_NULL value (qd_field_location->tag == QD_AMQP_NULL).
//
static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_message_field_t field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_message_field_t section = qd_field_section(field);
switch (section) {
case QD_FIELD_HEADER:
if (content->section_message_header.parsed ||
(qd_message_check_depth(msg, QD_DEPTH_HEADER) == QD_MESSAGE_DEPTH_OK && content->section_message_header.parsed))
return &content->section_message_header;
break;
case QD_FIELD_PROPERTIES:
return qd_message_properties_field(msg, field);
case QD_FIELD_DELIVERY_ANNOTATION:
if (content->section_delivery_annotation.parsed ||
(qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS) == QD_MESSAGE_DEPTH_OK && content->section_delivery_annotation.parsed))
return &content->section_delivery_annotation;
break;
case QD_FIELD_MESSAGE_ANNOTATION:
if (content->section_message_annotation.parsed ||
(qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) == QD_MESSAGE_DEPTH_OK && content->section_message_annotation.parsed))
return &content->section_message_annotation;
break;
case QD_FIELD_APPLICATION_PROPERTIES:
if (content->section_application_properties.parsed ||
(qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) == QD_MESSAGE_DEPTH_OK && content->section_application_properties.parsed))
return &content->section_application_properties;
break;
case QD_FIELD_BODY:
if (content->section_body.parsed ||
(qd_message_check_depth(msg, QD_DEPTH_BODY) == QD_MESSAGE_DEPTH_OK && content->section_body.parsed))
return &content->section_body;
break;
case QD_FIELD_FOOTER:
if (content->section_footer.parsed ||
(qd_message_check_depth(msg, QD_DEPTH_ALL) == QD_MESSAGE_DEPTH_OK && content->section_footer.parsed))
return &content->section_footer;
break;
default:
assert(false); // TBD: add support as needed
return 0;
}
return 0;
}
qd_message_t *qd_message()
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) new_qd_message_t();
if (!msg)
return 0;
ZERO (msg);
msg->content = new_qd_message_content_t();
if (msg->content == 0) {
free_qd_message_t((qd_message_t*) msg);
return 0;
}
ZERO(msg->content);
msg->content->lock = sys_mutex();
sys_atomic_init(&msg->content->aborted, 0);
sys_atomic_init(&msg->content->discard, 0);
sys_atomic_init(&msg->content->no_body, 0);
sys_atomic_init(&msg->content->oversize, 0);
sys_atomic_init(&msg->content->priority, QDR_DEFAULT_PRIORITY);
sys_atomic_init(&msg->content->priority_parsed, 0);
sys_atomic_init(&msg->content->receive_complete, 0);
sys_atomic_init(&msg->content->ref_count, 1);
msg->content->parse_depth = QD_DEPTH_NONE;
return (qd_message_t*) msg;
}
void qd_message_free(qd_message_t *in_msg)
{
if (!in_msg) return;
uint32_t rc;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_q2_unblocker_t q2_unblock = {0};
free(msg->ma_to_override);
sys_atomic_destroy(&msg->send_complete);
qd_message_content_t *content = msg->content;
if (msg->is_fanout) {
//
// Adjust the content's fanout count and decrement all buffer fanout
// counts starting with the msg cursor. If the buffer count drops to
// zero we can free it.
//
LOCK(content->lock);
// DISPATCH-2099: ensure all outstanding stream_data items associated
// with this message have been returned since the underlying buffers
// may be released
assert(DEQ_IS_EMPTY(msg->stream_data_list));
const bool was_blocked = !_Q2_holdoff_should_unblock_LH(content);
qd_buffer_t *buf = msg->cursor.buffer;
while (buf) {
qd_buffer_t *next_buf = DEQ_NEXT(buf);
if (qd_buffer_dec_fanout(buf) == 1) {
DEQ_REMOVE(content->buffers, buf);
qd_buffer_free(buf);
}
buf = next_buf;
}
--content->fanout;
//
// it is possible that we've freed enough buffers to clear Q2 holdoff
//
if (content->q2_input_holdoff
&& was_blocked
&& _Q2_holdoff_should_unblock_LH(content)) {
content->q2_input_holdoff = false;
q2_unblock = content->q2_unblocker;
}
UNLOCK(content->lock);
}
// the Q2 handler must be invoked outside the lock
if (q2_unblock.handler)
q2_unblock.handler(q2_unblock.context);
rc = sys_atomic_dec(&content->ref_count) - 1;
if (rc == 0) {
if (content->ma_field_iter_in)
qd_iterator_free(content->ma_field_iter_in);
if (content->ma_pf_ingress)
qd_parse_free(content->ma_pf_ingress);
if (content->ma_pf_to_override)
qd_parse_free(content->ma_pf_to_override);
if (content->ma_pf_trace)
qd_parse_free(content->ma_pf_trace);
qd_buffer_list_free_buffers(&content->buffers);
if (content->pending)
qd_buffer_free(content->pending);
sys_mutex_free(content->lock);
sys_atomic_destroy(&content->aborted);
sys_atomic_destroy(&content->discard);
sys_atomic_destroy(&content->no_body);
sys_atomic_destroy(&content->oversize);
sys_atomic_destroy(&content->priority);
sys_atomic_destroy(&content->priority_parsed);
sys_atomic_destroy(&content->receive_complete);
sys_atomic_destroy(&content->ref_count);
free_qd_message_content_t(content);
}
free_qd_message_t((qd_message_t*) msg);
}
qd_message_t *qd_message_copy(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_message_pvt_t *copy = (qd_message_pvt_t*) new_qd_message_t();
if (!copy)
return 0;
ZERO(copy);
copy->strip_annotations_in = msg->strip_annotations_in;
copy->content = content;
copy->sent_depth = QD_DEPTH_NONE;
copy->cursor.buffer = 0;
copy->cursor.cursor = 0;
sys_atomic_init(&copy->send_complete, 0);
copy->tag_sent = false;
copy->is_fanout = false;
if (!content->ma_disabled) {
if (msg->ma_to_override)
copy->ma_to_override = qd_strdup(msg->ma_to_override);
copy->ma_filter_trace = msg->ma_filter_trace;
copy->ma_filter_ingress = msg->ma_filter_ingress;
copy->ma_reset_trace = msg->ma_reset_trace;
copy->ma_reset_ingress = msg->ma_reset_ingress;
copy->ma_phase = msg->ma_phase;
copy->ma_streaming = msg->ma_streaming;
}
sys_atomic_inc(&content->ref_count);
return (qd_message_t*) copy;
}
const char *qd_message_parse_annotations(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
assert(!content->ma_disabled); // should not be called when skipping MA processing
if (content->ma_parsed)
return 0;
content->ma_parsed = true;
content->ma_field_iter_in = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
if (content->ma_field_iter_in == 0)
return 0;
qd_parsed_field_t *ma_pf_stream = 0;
qd_parsed_field_t *ma_pf_phase = 0;
const char *err = qd_parse_annotations(msg->strip_annotations_in,
content->ma_field_iter_in,
&content->ma_pf_ingress,
&ma_pf_phase,
&content->ma_pf_to_override,
&content->ma_pf_trace,
&ma_pf_stream,
&content->ma_user_annotations,
&content->ma_user_count);
if (err)
return(err);
// cache incoming values into the message
if (ma_pf_phase) {
msg->ma_phase = qd_parse_as_int(ma_pf_phase);
qd_parse_free(ma_pf_phase);
}
if (ma_pf_stream) {
msg->ma_streaming = true;
qd_parse_free(ma_pf_stream);
}
return 0;
}
void qd_message_set_to_override_annotation(qd_message_t *in_msg, const char *to_field)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
free(msg->ma_to_override);
msg->ma_to_override = to_field ? qd_strdup(to_field) : 0;
}
void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
msg->ma_phase = phase;
}
int qd_message_get_phase_annotation(const qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
return msg->ma_phase;
}
void qd_message_set_streaming_annotation(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
msg->ma_streaming = true;
}
void qd_message_disable_router_annotations(qd_message_t *msg)
{
qd_message_content_t *content = ((qd_message_pvt_t *)msg)->content;
content->ma_disabled = true;
content->ma_parsed = true;
}
bool qd_message_is_discard(qd_message_t *msg)
{
if (!msg)
return false;
qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
return IS_ATOMIC_FLAG_SET(&pvt_msg->content->discard);
}
void qd_message_set_discard(qd_message_t *msg, bool discard)
{
if (!msg)
return;
qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
SET_ATOMIC_BOOL(&pvt_msg->content->discard, discard);
}
// update the buffer reference counts for a new outgoing message
//
void qd_message_add_fanout(qd_message_t *out_msg)
{
assert(out_msg);
qd_message_pvt_t *msg = (qd_message_pvt_t *)out_msg;
msg->is_fanout = true;
qd_message_content_t *content = msg->content;
LOCK(content->lock);
++content->fanout;
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
// DISPATCH-1590: content->buffers may not be set up yet if
// content->pending is the first buffer and it is not yet full.
if (!buf) {
assert(content->pending && qd_buffer_size(content->pending) > 0);
DEQ_INSERT_TAIL(content->buffers, content->pending);
content->pending = 0;
buf = DEQ_HEAD(content->buffers);
}
// DISPATCH-1330: since we're incrementing the refcount be sure to set
// the cursor to the head buf in case msg is discarded before all data
// is sent (we'll decref any unsent buffers at that time)
//
msg->cursor.buffer = buf;
while (buf) {
qd_buffer_inc_fanout(buf);
buf = DEQ_NEXT(buf);
}
UNLOCK(content->lock);
}
/**
* There are two sources of priority information --
* message and address. Address takes precedence, falling
* through when no address priority has been specified.
* This also means that messages must always have a priority,
* using default value if sender leaves it unspecified.
*/
uint8_t qd_message_get_priority(qd_message_t *msg)
{
qd_message_content_t *content = MSG_CONTENT(msg);
if (!IS_ATOMIC_FLAG_SET(&content->priority_parsed))
qd_message_parse_priority(msg);
return sys_atomic_get(&content->priority);
}
bool qd_message_receive_complete(qd_message_t *in_msg)
{
if (!in_msg)
return false;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
return IS_ATOMIC_FLAG_SET(&msg->content->receive_complete);
}
bool qd_message_send_complete(qd_message_t *in_msg)
{
if (!in_msg)
return false;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
return IS_ATOMIC_FLAG_SET(&msg->send_complete);
}
void qd_message_set_send_complete(qd_message_t *in_msg)
{
if (!!in_msg) {
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
SET_ATOMIC_FLAG(&msg->send_complete);
}
}
void qd_message_set_receive_complete(qd_message_t *in_msg)
{
if (!!in_msg) {
qd_message_content_t *content = MSG_CONTENT(in_msg);
qd_message_q2_unblocker_t q2_unblock = {0};
LOCK(content->lock);
SET_ATOMIC_FLAG(&content->receive_complete);
if (content->q2_input_holdoff) {
content->q2_input_holdoff = false;
q2_unblock = content->q2_unblocker;
}
content->q2_unblocker.handler = 0;
qd_nullify_safe_ptr(&content->q2_unblocker.context);
UNLOCK(content->lock);
if (q2_unblock.handler)
q2_unblock.handler(q2_unblock.context);
}
}
void qd_message_set_no_body(qd_message_t *in_msg)
{
if (!!in_msg) {
qd_message_content_t *content = MSG_CONTENT(in_msg);
SET_ATOMIC_FLAG(&content->no_body);
}
}
bool qd_message_no_body(qd_message_t *in_msg)
{
if (!!in_msg) {
qd_message_content_t *content = MSG_CONTENT(in_msg);
return IS_ATOMIC_FLAG_SET(&content->no_body);
}
return false;
}
bool qd_message_tag_sent(qd_message_t *in_msg)
{
if (!in_msg)
return false;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
return msg->tag_sent;
}
void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
{
if (!in_msg)
return;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
msg->tag_sent = tag_sent;
}
/**
* Receive and discard large messages for which there is no destination.
* Don't waste resources by putting the message into internal buffers.
* Message locking is not required since the message content buffers are untouched.
*/
qd_message_t *discard_receive(pn_delivery_t *delivery,
pn_link_t *link,
qd_message_t *msg_in)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*)msg_in;
while (1) {
#define DISCARD_BUFFER_SIZE (128 * 1024)
char dummy[DISCARD_BUFFER_SIZE];
ssize_t rc = pn_link_recv(link, dummy, DISCARD_BUFFER_SIZE);
if (rc == 0) {
// have read all available pn_link incoming bytes
break;
} else if (rc == PN_EOS || rc < 0) {
// End of message or error: finalize message_receive handling
if (pn_delivery_aborted(delivery)) {
SET_ATOMIC_FLAG(&msg->content->aborted);
}
pn_record_t *record = pn_delivery_attachments(delivery);
pn_record_set(record, PN_DELIVERY_CTX, 0);
if (IS_ATOMIC_FLAG_SET(&msg->content->oversize)) {
// Aborting the content disposes of downstream copies.
// This has no effect on the received message.
SET_ATOMIC_FLAG(&msg->content->aborted);
}
qd_message_set_receive_complete((qd_message_t*) msg);
break;
} else {
// rc was > 0. bytes were read and discarded.
}
}
return msg_in;
}
qd_message_t * qd_get_message_context(pn_delivery_t *delivery)
{
pn_record_t *record = pn_delivery_attachments(delivery);
if (record)
return (qd_message_t *) pn_record_get(record, PN_DELIVERY_CTX);
return 0;
}
bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg)
{
if (!msg)
return false;
if (MSG_CONTENT(msg)) {
if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) {
qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers);
if (buf && qd_buffer_size(buf) > 0)
return true;
}
if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0)
return true;
}
return false;
}
qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
pn_link_t *link = pn_delivery_link(delivery);
qd_link_t *qdl = (qd_link_t *)pn_link_get_context(link);
ssize_t rc;
pn_record_t *record = pn_delivery_attachments(delivery);
qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX);
//
// If there is no message associated with the delivery then this is the
// first time we've received anything on this delivery.
// Allocate a message descriptor and link it and the delivery together.
//
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
qd_connection_t *qdc = qd_link_connection(qdl);
qd_alloc_safe_ptr_t sp = QD_SAFE_PTR_INIT(qdl);
qd_message_set_q2_unblocked_handler((qd_message_t*) msg, qd_link_q2_restart_receive, sp);
msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc);
pn_record_def(record, PN_DELIVERY_CTX, PN_VOID);
pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
msg->content->max_message_size = qd_connection_max_message_size(qdc);
qd_link_set_incoming_msg(qdl, (qd_message_t*) msg);
}
//
// The discard flag indicates we should keep reading the input stream
// but not process the message for delivery.
// Oversize messages are also discarded.
//
if (IS_ATOMIC_FLAG_SET(&msg->content->discard)) {
return discard_receive(delivery, link, (qd_message_t *)msg);
}
// if q2 holdoff has been disabled (disable_q2_holdoff=true), we keep receiving.
// if q2 holdoff has been enabled (disable_q2_holdoff=false), if input is in holdoff then just exit.
// When enough buffers
// have been processed and freed by outbound processing then
// message holdoff is cleared and receiving may continue.
//
LOCK(msg->content->lock);
if (!qd_link_is_q2_limit_unbounded(qdl) && !msg->content->disable_q2_holdoff) {
if (msg->content->q2_input_holdoff) {
UNLOCK(msg->content->lock);
return (qd_message_t*)msg;
}
}
UNLOCK(msg->content->lock);
// Loop until msg is complete, error seen, or incoming bytes are consumed
qd_message_content_t *content = msg->content;
bool recv_error = false;
while (1) {
//
// handle EOS and clean up after pn receive errors
//
bool at_eos = (pn_delivery_partial(delivery) == false) &&
(pn_delivery_aborted(delivery) == false) &&
(pn_delivery_pending(delivery) == 0);
if (at_eos || recv_error) {
// Message is complete
qd_buffer_t * pending_free = 0; // free empty pending buffer outside of lock
LOCK(content->lock);
{
// Append last buffer if any with data
if (content->pending) {
if (qd_buffer_size(content->pending) > 0) {
// pending buffer has bytes that are part of message
qd_buffer_set_fanout(content->pending, content->fanout);
DEQ_INSERT_TAIL(content->buffers,
content->pending);
} else {
// pending buffer is empty
pending_free = content->pending;
}
content->pending = 0;
} else {
// pending buffer is absent
}
content->receive_complete = true;
content->q2_unblocker.handler = 0;
qd_nullify_safe_ptr(&content->q2_unblocker.context);
if (pn_delivery_aborted(delivery)) {
SET_ATOMIC_FLAG(&msg->content->aborted);
}
// unlink message and delivery
pn_record_set(record, PN_DELIVERY_CTX, 0);
}
UNLOCK(content->lock);
if (!!pending_free) {
qd_buffer_free(pending_free);
}
break;
}
//
// Handle a missing or full pending buffer
//
if (!content->pending) {
// Pending buffer is absent: get a new one
content->pending = qd_buffer();
} else {
// Pending buffer exists
if (qd_buffer_capacity(content->pending) == 0) {
// Pending buffer is full
LOCK(content->lock);
qd_buffer_set_fanout(content->pending, content->fanout);
DEQ_INSERT_TAIL(content->buffers, content->pending);
content->pending = 0;
if (_Q2_holdoff_should_block_LH(content)) {
if (!qd_link_is_q2_limit_unbounded(qdl)) {
content->q2_input_holdoff = true;
UNLOCK(content->lock);
break;
}
}
UNLOCK(content->lock);
content->pending = qd_buffer();
} else {
// Pending buffer still has capacity
}
}
//
// Try to fill the remaining space in the pending buffer.
//
rc = pn_link_recv(link,
(char*) qd_buffer_cursor(content->pending),
qd_buffer_capacity(content->pending));
if (rc < 0) {
// error or eos seen. next pass breaks out of loop
recv_error = true;
} else if (rc > 0) {
//
// We have received a positive number of bytes for the message.
// Advance the cursor in the buffer.
//
qd_buffer_insert(content->pending, rc);
// Handle maxMessageSize violations
if (content->max_message_size) {
content->bytes_received += rc;
if (content->bytes_received > content->max_message_size)
{
qd_connection_t *conn = qd_link_connection(qdl);
qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer maxMessageSize exceeded");
qd_policy_count_max_size_event(link, conn);
SET_ATOMIC_FLAG(&content->discard);
SET_ATOMIC_FLAG(&content->oversize);
return discard_receive(delivery, link, (qd_message_t*)msg);
}
}
} else {
//
// We received zero bytes, and no PN_EOS. This means that we've received
// all of the data available up to this point, but it does not constitute
// the entire message. We'll be back later to finish it up.
// Return the message so that the caller can start sending out whatever we have received so far
//
// push what we do have for testing/processing
if (qd_buffer_size(content->pending) > 0) {
LOCK(content->lock);
qd_buffer_set_fanout(content->pending, content->fanout);
DEQ_INSERT_TAIL(content->buffers, content->pending);
content->pending = 0;
UNLOCK(content->lock);
content->pending = qd_buffer();
}
break;
}
}
return (qd_message_t*) msg;
}
static void send_handler(void *context, const unsigned char *start, int length)
{
pn_link_t *pnl = (pn_link_t*) context;
pn_link_send(pnl, (const char*) start, length);
}
// Restore MA to the original user-supplied MA values. This merely sets up the
// annotations section and map header to hold only the user supplied
// annotations.
//
// @return the length of the ma_header field in octets
//
static int restore_user_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header)
{
qd_message_content_t *content = msg->content;
if (content->ma_user_count) {
// setup the MA descriptor:
ma_header[0] = 0;
ma_header[1] = QD_AMQP_SMALLULONG;
ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS;
// setup the MA MAP header. The type of header (MAP32/8) depends on
// the size of the map contents.
const int map_hdr_len = qd_compose_map_header(&ma_header[3],
content->ma_user_annotations.remaining,
content->ma_user_count);
return map_hdr_len + 3;
}
return 0;
}
// Generate the MA section header and the router annotations. Any user
// annotations will be sent after ma_header and before ma_trailer.
//
// @return the length of the ma_header field in octets
//
static int compose_router_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header,
qd_buffer_list_t *ma_trailer)
{
qd_message_content_t *content = msg->content;
// account for any user annotations to be sent before the router annotations
//
uint32_t mcount = content->ma_user_count;
uint32_t msize = content->ma_user_annotations.remaining;
if (msg->ma_phase) {
assert(msg->ma_phase < 128); // smallint
mcount += 2;
// key:
msize += QD_MA_PHASE_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_PHASE_ENCODED, QD_MA_PHASE_ENCODED_LEN);
// value:
msize += 2; // tag + 1 byte value
uint8_t ma_phase[2];
ma_phase[0] = QD_AMQP_SMALLINT;
ma_phase[1] = msg->ma_phase;
qd_buffer_list_append(ma_trailer, ma_phase, 2);
}
if (msg->ma_streaming) {
mcount += 2;
// key:
msize += QD_MA_STREAM_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_STREAM_ENCODED, QD_MA_STREAM_ENCODED_LEN);
// value: historically sent as int value 1:
msize += 2;
const uint8_t streaming[2] = {QD_AMQP_SMALLINT, 1};
qd_buffer_list_append(ma_trailer, streaming, 2);
}
if (msg->ma_to_override || content->ma_pf_to_override) {
mcount += 2;
// key:
msize += QD_MA_TO_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_TO_ENCODED, QD_MA_TO_ENCODED_LEN);
// value: message specific value takes precedence over value in
// original received message to allow overriding the to-override
uint8_t hdr[5]; // max length of encoded str8/32 header
if (msg->ma_to_override) {
const size_t str_len = strlen(msg->ma_to_override);
const int hdr_len = qd_compose_str_header(hdr, str_len);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, hdr, hdr_len);
msize += str_len;
qd_buffer_list_append(ma_trailer, (uint8_t*) msg->ma_to_override, str_len);
} else {
qd_buffer_field_t to = qd_parse_value(content->ma_pf_to_override);
const int hdr_len = qd_compose_str_header(hdr, to.remaining);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, hdr, hdr_len);
msize += to.remaining;
qd_buffer_list_append_field(ma_trailer, &to);
}
}
if (!msg->ma_filter_ingress) {
mcount += 2;
// key
msize += QD_MA_INGRESS_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_INGRESS_ENCODED, QD_MA_INGRESS_ENCODED_LEN);
// value: use original value if present, else the local node is the
// ingress
if (content->ma_pf_ingress && !msg->ma_reset_ingress) {
uint8_t hdr[5]; // max size str8/32 header
qd_buffer_field_t ingress = qd_parse_value(content->ma_pf_ingress);
const int hdr_len = qd_compose_str_header(hdr, ingress.remaining);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, hdr, hdr_len);
msize += ingress.remaining;
qd_buffer_list_append_field(ma_trailer, &ingress);
} else {
size_t node_id_len;
const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
msize += node_id_len;
qd_buffer_list_append(ma_trailer, node_id, node_id_len);
}
}
if (!msg->ma_filter_trace) {
mcount += 2;
size_t node_id_len;
const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
uint32_t trace_count = 1; // local node
uint32_t trace_len = node_id_len;
const bool use_incoming = content->ma_pf_trace && !msg->ma_reset_trace;
// key
msize += QD_MA_TRACE_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_TRACE_ENCODED, QD_MA_TRACE_ENCODED_LEN);
// value: first compute trace list size and count since the list header
// must be written first
qd_buffer_field_t in_trace = {0};
if (use_incoming) {
in_trace = qd_parse_value(content->ma_pf_trace);
trace_len += in_trace.remaining;
trace_count += qd_parse_sub_count(content->ma_pf_trace);
}
uint8_t list_hdr[9]; // max len encoded list header
const int hdr_len = qd_compose_list_header(list_hdr, trace_len, trace_count);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, list_hdr, hdr_len);
if (use_incoming) {
msize += in_trace.remaining;
qd_buffer_list_append_field(ma_trailer, &in_trace);
}
msize += node_id_len;
qd_buffer_list_append(ma_trailer, node_id, node_id_len);
}
if (msize) {
// setup the MA section descriptor:
ma_header[0] = 0;
ma_header[1] = QD_AMQP_SMALLULONG;
ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS;
// setup the MA MAP header
const int hdr_size = qd_compose_map_header(&ma_header[3], msize, mcount);
return hdr_size + 3;
}
return 0;
}
void qd_message_send(qd_message_t *in_msg,
qd_link_t *link,
bool strip_annotations,
bool *q3_stalled)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
pn_link_t *pnl = qd_link_pn(link);
*q3_stalled = false;
if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
// Message is aborted before any part of it has been sent.
// Declare the message to be sent,
SET_ATOMIC_FLAG(&msg->send_complete);
// If the outgoing delivery is not already aborted then abort it.
if (!pn_delivery_aborted(pn_link_current(pnl))) {
pn_delivery_abort(pn_link_current(pnl));
}
return;
}
msg->cursor.buffer = DEQ_HEAD(content->buffers);
msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer);
// Since link-routed messages do not set router annotations they will
// skip the following (content->ma_disabled will be true) and unconditionally
// start sending from the first octet of the content.
if (!content->ma_disabled) {
//
// Send header if present
//
const int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
if (header_consume > 0) {
assert(msg->cursor.cursor == content->section_message_header.offset + qd_buffer_base(msg->cursor.buffer));
advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, header_consume, send_handler, (void*) pnl);
}
//
// Send delivery annotation if present
//
const int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
if (da_consume > 0) {
assert(msg->cursor.cursor == content->section_delivery_annotation.offset + qd_buffer_base(msg->cursor.buffer));
advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, da_consume, send_handler, (void*) pnl);
}
//
// Send the message annotations section
//
uint8_t ma_header[12]; // max length for MA section and map header
int ma_header_len; // size of ma_header content
qd_buffer_list_t ma_trailer = DEQ_EMPTY;
if (strip_annotations) {
// send the original user message annotations only (if present)
ma_header_len = restore_user_message_annotations(msg, ma_header);
} else {
ma_header_len = compose_router_message_annotations(msg, ma_header, &ma_trailer);
}
if (ma_header_len) {
//
// send annotation section and map header
//
pn_link_send(pnl, (char*) ma_header, ma_header_len);
//
// Now send any annotation set by the original endpoint
//
if (content->ma_user_annotations.remaining) {
qd_buffer_t *buf2 = content->ma_user_annotations.buffer;
const uint8_t *cursor2 = content->ma_user_annotations.cursor;
advance_guarded(&cursor2, &buf2,
content->ma_user_annotations.remaining,
send_handler, (void*) pnl);
}
//
// Next send router annotations
//
qd_buffer_t *ta_buf = DEQ_HEAD(ma_trailer);
while (ta_buf) {
char *to_send = (char*) qd_buffer_base(ta_buf);
pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
ta_buf = DEQ_NEXT(ta_buf);
}
qd_buffer_list_free_buffers(&ma_trailer);
}
//
// Skip over replaced message annotations
//
const int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
if (ma_consume > 0) {
assert(msg->cursor.cursor == content->section_message_annotation.offset + qd_buffer_base(msg->cursor.buffer));
advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, ma_consume, 0, 0);
}
}
msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
}
qd_buffer_t *buf = msg->cursor.buffer;
qd_message_q2_unblocker_t q2_unblock = {0};
pn_session_t *pns = pn_link_session(pnl);
const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
while (!IS_ATOMIC_FLAG_SET(&content->aborted)
&& buf
&& pn_session_outgoing_bytes(pns) < q3_upper) {
// This will send the remaining data in the buffer if any. There may be
// zero bytes left to send if we stopped here last time and there was
// no next buf
//
size_t buf_size = qd_buffer_size(buf);
int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf));
ssize_t bytes_sent = 0;
if (num_bytes_to_send > 0) {
bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send);
}
LOCK(content->lock);
if (bytes_sent < 0) {
//
// send error - likely the link has failed and we will eventually
// get a link detach event for this link
//
SET_ATOMIC_FLAG(&content->aborted);
SET_ATOMIC_FLAG(&msg->send_complete);
if (!pn_delivery_aborted(pn_link_current(pnl))) {
pn_delivery_abort(pn_link_current(pnl));
}
qd_log(qd_message_log_source(),
QD_LOG_WARNING,
"Sending data on link %s has failed (code=%zi)",
pn_link_name(pnl), bytes_sent);
} else {
msg->cursor.cursor += bytes_sent;
if (bytes_sent == num_bytes_to_send) {
//
// sent the whole buffer.
// Can we move to the next buffer? Only if there is a next buffer
// or we are at the end and done sending this message
//
qd_buffer_t *next_buf = DEQ_NEXT(buf);
bool complete = qd_message_receive_complete(in_msg);
if (next_buf || complete) {
//
// this buffer may be freed if there are no more references to it
//
uint32_t ref_count = (msg->is_fanout) ? qd_buffer_dec_fanout(buf) : 1;
if (ref_count == 1) {
DEQ_REMOVE(content->buffers, buf);
qd_buffer_free(buf);
++content->buffers_freed;
// by freeing a buffer there now may be room to restart a
// stalled message receiver
if (content->q2_input_holdoff) {
if (_Q2_holdoff_should_unblock_LH(content)) {
// wake up receive side
// Note: clearing holdoff here is easy compared to
// clearing it in the deferred callback. Tracing
// shows that rx_handler may run and subsequently
// set input holdoff before the deferred handler
// runs.
content->q2_input_holdoff = false;
q2_unblock = content->q2_unblocker;
}
}
} // end free buffer
msg->cursor.buffer = next_buf;
msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) : 0;
SET_ATOMIC_BOOL(&msg->send_complete, (complete && !next_buf));
}
buf = next_buf;
} else if (num_bytes_to_send && bytes_sent == 0) {
//
// the proton link cannot take anymore data,
// retry later...
//
buf = 0;
qd_log(qd_message_log_source(), QD_LOG_DEBUG,
"Link %s output limit reached", pn_link_name(pnl));
}
}
UNLOCK(content->lock);
}
// the Q2 handler must be invoked outside the lock
if (q2_unblock.handler)
q2_unblock.handler(q2_unblock.context);
if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
if (pn_link_current(pnl)) {
SET_ATOMIC_FLAG(&msg->send_complete);
if (!pn_delivery_aborted(pn_link_current(pnl))) {
pn_delivery_abort(pn_link_current(pnl));
}
}
}
*q3_stalled = (pn_session_outgoing_bytes(pns) >= q3_upper);
}
static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *content,
qd_message_depth_t depth,
const unsigned char *long_pattern,
const unsigned char *short_pattern,
const unsigned char *expected_tags,
qd_field_location_t *location,
bool optional,
bool protect_buffer)
{
#define LONG 10
#define SHORT 3
if (depth <= content->parse_depth)
return QD_MESSAGE_DEPTH_OK;
qd_section_status_t rc;
rc = message_section_check_LH(content, &content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false, protect_buffer);
if (rc == QD_SECTION_NO_MATCH) // try the alternative
rc = message_section_check_LH(content, &content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location, false, protect_buffer);
if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) {
content->parse_depth = depth;
return QD_MESSAGE_DEPTH_OK;
}
if (rc == QD_SECTION_NEED_MORE) {
if (!IS_ATOMIC_FLAG_SET(&content->receive_complete))
return QD_MESSAGE_DEPTH_INCOMPLETE;
// no more data is going to come. OK if at the end and optional:
if (!can_advance(&content->parse_cursor, &content->parse_buffer) && optional)
return QD_MESSAGE_DEPTH_OK;
// otherwise we've got an invalid (truncated) header
}
// if QD_SECTION_NO_MATCH && !optional => INVALID;
// QD_SECTION_INVALID => INVALID;
return QD_MESSAGE_DEPTH_INVALID;
}
static qd_message_depth_status_t qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
{
qd_error_clear();
if (depth <= content->parse_depth || depth == QD_DEPTH_NONE)
return QD_MESSAGE_DEPTH_OK; // We've already parsed at least this deep
// Is there any data to check? This will also check for null messages, which
// are not valid:
//
qd_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer || qd_buffer_size(buffer) == 0) {
return IS_ATOMIC_FLAG_SET(&content->receive_complete) ? QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE;
}
if (content->buffers_freed) {
// this is likely a bug: the caller is attempting to access a
// section after the start of the message has already been sent and
// released, rendering the parse_buffer/cursor position invalid.
return QD_MESSAGE_DEPTH_INVALID;
}
if (content->parse_buffer == 0) {
content->parse_buffer = buffer;
content->parse_cursor = qd_buffer_base(content->parse_buffer);
}
qd_message_depth_status_t rc = QD_MESSAGE_DEPTH_OK;
int last_section = QD_DEPTH_NONE;
switch (content->parse_depth + 1) { // start checking at the next unparsed section
case QD_DEPTH_HEADER:
//
// MESSAGE HEADER (optional)
//
last_section = QD_DEPTH_HEADER;
rc = message_check_depth_LH(content, QD_DEPTH_HEADER,
MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST,
&content->section_message_header, true, true);
if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_HEADER)
break;
// fallthrough
case QD_DEPTH_DELIVERY_ANNOTATIONS:
//
// DELIVERY ANNOTATIONS (optional)
//
last_section = QD_DEPTH_DELIVERY_ANNOTATIONS;
rc = message_check_depth_LH(content, QD_DEPTH_DELIVERY_ANNOTATIONS,
DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP,
&content->section_delivery_annotation, true, true);
if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_DELIVERY_ANNOTATIONS)
break;
// fallthrough
case QD_DEPTH_MESSAGE_ANNOTATIONS:
//
// MESSAGE ANNOTATION (optional)
//
last_section = QD_DEPTH_MESSAGE_ANNOTATIONS;
rc = message_check_depth_LH(content, QD_DEPTH_MESSAGE_ANNOTATIONS,
MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP,
&content->section_message_annotation, true, true);
if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_MESSAGE_ANNOTATIONS)
break;
// fallthough
case QD_DEPTH_PROPERTIES:
//
// PROPERTIES (optional)
//
last_section = QD_DEPTH_PROPERTIES;
rc = message_check_depth_LH(content, QD_DEPTH_PROPERTIES,
PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST,
&content->section_message_properties, true, true);
if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_PROPERTIES)
break;
// fallthrough
case QD_DEPTH_APPLICATION_PROPERTIES:
//
// APPLICATION PROPERTIES (optional)
//
last_section = QD_DEPTH_APPLICATION_PROPERTIES;
rc = message_check_depth_LH(content, QD_DEPTH_APPLICATION_PROPERTIES,
APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP,
&content->section_application_properties, true, true);
if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_APPLICATION_PROPERTIES)
break;
// fallthrough
case QD_DEPTH_BODY:
//
// BODY (not optional, but proton allows it - see PROTON-2085)
//
// AMQP 1.0 defines 3 valid Body types: Binary, Sequence (list), or Value (any type)
// Since the body is mandatory, we need to match one of these. Setting
// the optional flag to false will force us to check each one until a match is found.
//
last_section = QD_DEPTH_BODY;
rc = message_check_depth_LH(content, QD_DEPTH_BODY,
BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY,
&content->section_body, false, false);
if (rc == QD_MESSAGE_DEPTH_INVALID) { // may be a different body type, need to check:
rc = message_check_depth_LH(content, QD_DEPTH_BODY,
BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY,
&content->section_body, false, false);
if (rc == QD_MESSAGE_DEPTH_INVALID) {
rc = message_check_depth_LH(content, QD_DEPTH_BODY,
BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST,
&content->section_body, true, false); // PROTON-2085
}
}
if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_BODY)
break;
// fallthrough
case QD_DEPTH_ALL:
//
// FOOTER (optional)
//
last_section = QD_DEPTH_ALL;
rc = message_check_depth_LH(content, QD_DEPTH_ALL,
FOOTER_LONG, FOOTER_SHORT, TAGS_MAP,
&content->section_footer, true, false);
break;
default:
assert(false); // should not happen!
qd_error(QD_ERROR_MESSAGE, "BUG! Invalid message depth specified: %d",
content->parse_depth + 1);
return QD_MESSAGE_DEPTH_INVALID;
}
if (rc == QD_MESSAGE_DEPTH_INVALID)
qd_error(QD_ERROR_MESSAGE, "Invalid message: %s section invalid",
section_names[last_section]);
return rc;
}
qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_message_depth_t depth)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_message_depth_status_t result;
LOCK(content->lock);
result = qd_message_check_LH(content, depth);
UNLOCK(content->lock);
return result;
}
qd_iterator_t *qd_message_field_iterator_typed(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return 0;
if (loc->tag == QD_AMQP_NULL)
return 0;
return qd_iterator_buffer(loc->buffer, loc->offset, loc->length + loc->hdr_length, ITER_VIEW_ALL);
}
qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return 0;
if (loc->tag == QD_AMQP_NULL)
return 0;
qd_buffer_t *buffer = loc->buffer;
unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset;
if (!advance(&cursor, &buffer, loc->hdr_length))
return 0;
return qd_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length, ITER_VIEW_ALL);
}
ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return -1;
return loc->length;
}
ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char *buffer, size_t *hdr_length)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return -1;
qd_buffer_t *buf = loc->buffer;
size_t bufsize = qd_buffer_size(buf) - loc->offset;
void *base = qd_buffer_base(buf) + loc->offset;
size_t remaining = loc->length + loc->hdr_length;
*hdr_length = loc->hdr_length;
while (remaining > 0) {
if (bufsize > remaining)
bufsize = remaining;
memcpy(buffer, base, bufsize);
buffer += bufsize;
remaining -= bufsize;
if (remaining > 0) {
buf = buf->next;
base = qd_buffer_base(buf);
bufsize = qd_buffer_size(buf);
}
}
return loc->length + loc->hdr_length;
}
// deprecated - use qd_message_compose() for creating locally generated messages
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, bool receive_complete)
{
qd_message_content_t *content = MSG_CONTENT(msg);
LOCK(content->lock);
SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1);
qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2);
content->buffers = *field1_buffers;
DEQ_INIT(*field1_buffers);
DEQ_APPEND(content->buffers, (*field2_buffers));
// initialize the Q2 flag:
if (_Q2_holdoff_should_block_LH(content))
content->q2_input_holdoff = true;
UNLOCK(content->lock);
// set up the locations of the message headers sent prior to the message
// annotations section. This is used when composing outgoing router
// annotations:
qd_message_parse_annotations(msg);
}
qd_message_t *qd_message_compose(qd_composed_field_t *f1,
qd_composed_field_t *f2,
qd_composed_field_t *f3,
bool receive_complete)
{
qd_message_t *msg = qd_message();
if (!msg)
return 0;
qd_composed_field_t *fields[4] = {f1, f2, f3, 0};
qd_message_content_t *content = MSG_CONTENT(msg);
SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
for (int idx = 0; fields[idx] != 0; ++idx) {
qd_buffer_list_t *bufs = qd_compose_buffers(fields[idx]);
DEQ_APPEND(content->buffers, (*bufs));
qd_compose_free(fields[idx]);
}
// initialize the Q2 flag:
if (_Q2_holdoff_should_block_LH(content))
content->q2_input_holdoff = true;
// set up the locations of the message headers sent prior to the message
// annotations section. This is used when composing outgoing router
// annotations:
qd_message_parse_annotations(msg);
return msg;
}
int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked)
{
qd_message_content_t *content = MSG_CONTENT(msg);
int count;
qd_buffer_list_t *buffers = qd_compose_buffers(field);
qd_buffer_t *buf = DEQ_HEAD(*buffers);
if (q2_blocked)
*q2_blocked = false;
LOCK(content->lock);
while (buf) {
qd_buffer_set_fanout(buf, content->fanout);
buf = DEQ_NEXT(buf);
}
DEQ_APPEND(content->buffers, (*buffers));
count = DEQ_SIZE(content->buffers);
// buffers added - must check for Q2:
if (_Q2_holdoff_should_block_LH(content)) {
content->q2_input_holdoff = true;
if (q2_blocked)
*q2_blocked = true;
}
UNLOCK(content->lock);
return count;
}
/**
* find_last_buffer_LH
*
* Given a field location, find the following:
*
* - *cursor - The pointer to the octet _past_ the last octet in the field. If this is the last octet in
* the buffer, the cursor must point one octet past the buffer.
* - *buffer - The last buffer that contains content for this field.
*
* Important: If the last octet of the field is the last octet of a buffer and there are more buffers in the
* buffer list, *buffer _must_ refer to the buffer that contains the last octet of the field and *cursor must
* point at the octet following that octet, even if it points past the end of the buffer.
*/
static void find_last_buffer_LH(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
{
qd_buffer_t *buf = location->buffer;
size_t remaining = location->hdr_length + location->length;
while (!!buf && remaining > 0) {
size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0);
if (remaining <= this_buf_size) {
*buffer = buf;
*cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining;
return;
}
remaining -= this_buf_size;
buf = DEQ_NEXT(buf);
}
assert(false); // The field should already have been validated as complete.
}
void trim_stream_data_headers_LH(qd_message_stream_data_t *stream_data, bool remove_vbin_header)
{
const qd_field_location_t *location = &stream_data->section;
qd_buffer_t *buffer = location->buffer;
unsigned char *cursor = qd_buffer_base(buffer) + location->offset;
bool good = advance(&cursor, &buffer, location->hdr_length);
assert(good);
if (good) {
size_t vbin_hdr_len = 0;
unsigned char tag = 0;
if (remove_vbin_header) {
vbin_hdr_len = 1;
// coverity[check_return]
next_octet(&cursor, &buffer, &tag);
if (tag == QD_AMQP_VBIN8) {
advance(&cursor, &buffer, 1);
vbin_hdr_len += 1;
} else if (tag == QD_AMQP_VBIN32) {
advance(&cursor, &buffer, 4);
vbin_hdr_len += 4;
}
}
// coverity[check_return]
can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary
stream_data->payload.buffer = buffer;
stream_data->payload.offset = cursor - qd_buffer_base(buffer);
stream_data->payload.length = location->length - vbin_hdr_len;
stream_data->payload.hdr_length = 0;
stream_data->payload.parsed = true;
stream_data->payload.tag = tag;
}
}
/**
* qd_message_stream_data_iterator
*
* Given a stream_data object, return an iterator that refers to the content of that body data. This iterator
* shall not refer to the 3-byte performative header or the header for the vbin{8,32} field.
*
* The iterator must be freed eventually by the caller.
*/
qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data)
{
const qd_field_location_t *location = &stream_data->payload;
return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL);
}
/**
* qd_message_stream_data_payload_length
*
* Given a stream_data object, return the length of the payload.
*/
size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data)
{
return stream_data->payload.length;
}
/**
* qd_message_stream_data_buffer_count
*
* Return the number of buffers contained in payload portion of the stream_data object.
*/
int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data)
{
if (stream_data->payload.length == 0)
return 0;
int count = 1;
qd_buffer_t *buffer = stream_data->payload.buffer;
while (!!buffer && buffer != stream_data->last_buffer) {
buffer = DEQ_NEXT(buffer);
count++;
}
return count;
}
/**
* qd_message_stream_data_buffers
*
* Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the stream_data
* object. Don't fill more than count raw_buffers with data. Start at offset from the zero-th buffer in the
* stream_data.
*/
int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count)
{
qd_buffer_t *buffer = stream_data->payload.buffer;
size_t data_offset = stream_data->payload.offset;
size_t payload_len = stream_data->payload.length;
qd_message_pvt_t *owning_message = stream_data->owning_message;
LOCK(owning_message->content->lock);
//
// Skip the buffer offset
//
if (offset > 0) {
assert(offset < qd_message_stream_data_buffer_count(stream_data));
while (offset > 0 && payload_len > 0) {
payload_len -= qd_buffer_size(buffer) - data_offset;
offset--;
data_offset = 0;
buffer = DEQ_NEXT(buffer);
}
}
//
// Fill the buffer array
//
int idx = 0;
while (idx < count && payload_len > 0) {
size_t buf_size = MIN(payload_len, qd_buffer_size(buffer) - data_offset);
buffers[idx].context = 0; // reserved for use by caller - do not modify!
buffers[idx].bytes = (char*) qd_buffer_base(buffer) + data_offset;
buffers[idx].capacity = BUFFER_SIZE;
buffers[idx].size = buf_size;
buffers[idx].offset = 0;
data_offset = 0;
payload_len -= buf_size;
buffer = DEQ_NEXT(buffer);
idx++;
}
UNLOCK(owning_message->content->lock);
return idx;
}
void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data)
{
if (!stream_data)
return;
qd_message_pvt_t *msg = stream_data->owning_message;
qd_message_stream_data_t *next = DEQ_HEAD(msg->stream_data_list);
qd_message_stream_data_t *current = NULL;
while (next && current != stream_data) {
current = next;
next = DEQ_NEXT(next);
qd_message_stream_data_release(current);
}
}
/**
* qd_message_stream_data_release
*
* Decrement the fanout ref-counts for all of the buffers referred to in the stream_data. If any have reached zero,
* remove them from the buffer list and free them.
*
* Do not free buffers that overlap with other stream_data or the buffer pointed to by msg->body_buffer.
*/
void qd_message_stream_data_release(qd_message_stream_data_t *stream_data)
{
if (!stream_data)
return;
qd_message_pvt_t *pvt = stream_data->owning_message;
qd_message_content_t *content = pvt->content;
qd_buffer_t *buf;
//
// find the range of buffers that do not overlap other stream_data
// or msg->body_buffer
//
qd_buffer_t *start_buf = stream_data->free_prev ? DEQ_PREV(stream_data->section.buffer) : stream_data->section.buffer;
if (DEQ_PREV(stream_data) && DEQ_PREV(stream_data)->last_buffer == start_buf) {
// overlap previous stream_data
if (start_buf == stream_data->last_buffer) {
// no buffers to free
DEQ_REMOVE(pvt->stream_data_list, stream_data);
free_qd_message_stream_data_t(stream_data);
return;
}
start_buf = DEQ_NEXT(start_buf);
}
qd_buffer_t *stop_buf;
if (stream_data->last_buffer == pvt->body_buffer
|| (DEQ_NEXT(stream_data) && DEQ_NEXT(stream_data)->section.buffer == stream_data->last_buffer)) {
stop_buf = stream_data->last_buffer;
} else {
stop_buf = DEQ_NEXT(stream_data->last_buffer);
}
LOCK(content->lock);
bool was_blocked = !_Q2_holdoff_should_unblock_LH(content);
qd_message_q2_unblocker_t q2_unblock = {0};
if (pvt->is_fanout) {
buf = start_buf;
while (buf != stop_buf) {
uint32_t old = qd_buffer_dec_fanout(buf);
(void)old; // avoid compiler unused var error
assert(old > 0);
buf = DEQ_NEXT(buf);
}
}
//
// Free non-overlapping buffers with zero refcounts.
//
buf = start_buf;
while (buf != stop_buf) {
qd_buffer_t *next = DEQ_NEXT(buf);
if (qd_buffer_get_fanout(buf) == 0) {
DEQ_REMOVE(content->buffers, buf);
qd_buffer_free(buf);
}
buf = next;
}
//
// it is possible that we've freed enough buffers to clear Q2 holdoff
//
if (content->q2_input_holdoff
&& was_blocked
&& _Q2_holdoff_should_unblock_LH(content)) {
content->q2_input_holdoff = false;
q2_unblock = content->q2_unblocker;
}
UNLOCK(content->lock);
DEQ_REMOVE(pvt->stream_data_list, stream_data);
free_qd_message_stream_data_t(stream_data);
if (q2_unblock.handler)
q2_unblock.handler(q2_unblock.context);
}
qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg, qd_message_stream_data_t **out_stream_data)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_message_stream_data_t *stream_data = 0;
*out_stream_data = 0;
if (!msg->body_cursor) {
//
// We haven't returned a body-data record for this message yet. Start
// by ensuring the message has been parsed up to the first body section
//
qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY);
if (status == QD_MESSAGE_DEPTH_OK) {
// Even if DEPTH_OK, body is optional. If there is no body then move to
// the footer
if (msg->content->section_body.buffer) {
msg->body_buffer = msg->content->section_body.buffer;
msg->body_cursor = qd_buffer_base(msg->body_buffer) + msg->content->section_body.offset;
} else {
// No body. Look for footer
status = qd_message_check_depth(in_msg, QD_DEPTH_ALL);
if (status == QD_MESSAGE_DEPTH_OK) {
if (msg->content->section_footer.buffer) {
// footer is also optional
msg->body_buffer = msg->content->section_footer.buffer;
msg->body_cursor = qd_buffer_base(msg->body_buffer) + msg->content->section_footer.offset;
}
}
}
}
if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
return QD_MESSAGE_STREAM_DATA_INCOMPLETE;
if (status == QD_MESSAGE_DEPTH_INVALID)
return QD_MESSAGE_STREAM_DATA_INVALID;
// neither data not footer found
if (!msg->body_buffer)
return QD_MESSAGE_STREAM_DATA_NO_MORE;
}
// parse out the body data section, or the footer if we're past the
// last data section
qd_section_status_t section_status;
qd_field_location_t location;
ZERO(&location);
qd_buffer_t * const old_body_buffer = msg->body_buffer;
bool is_footer = false;
qd_message_stream_data_result_t result = QD_MESSAGE_STREAM_DATA_NO_MORE;
LOCK(content->lock);
section_status = message_section_check_LH(content,
&msg->body_buffer, &msg->body_cursor,
BODY_DATA_SHORT, 3, TAGS_BINARY,
&location,
true, // allow duplicates
false); // do not inc buffer fanout
if (section_status == QD_SECTION_NO_MATCH) {
is_footer = true;
section_status = message_section_check_LH(content,
&msg->body_buffer, &msg->body_cursor,
FOOTER_SHORT, 3, TAGS_MAP,
&location, true, false);
}
switch (section_status) {
case QD_SECTION_INVALID:
case QD_SECTION_NO_MATCH:
result = QD_MESSAGE_STREAM_DATA_INVALID;
break;
case QD_SECTION_MATCH:
stream_data = new_qd_message_stream_data_t();
ZERO(stream_data);
stream_data->owning_message = msg;
stream_data->section = location;
find_last_buffer_LH(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
stream_data->last_buffer = msg->body_buffer;
trim_stream_data_headers_LH(stream_data, !is_footer);
DEQ_INSERT_TAIL(msg->stream_data_list, stream_data);
*out_stream_data = stream_data;
// if the buffer pointed to by the old msg->body_buffer could not be
// freed when the previous stream_data was released, release it when
// this stream_data is released. Do not free it here as it may affect
// Q2 threshold, which is checked when the stream_data is released.
if (DEQ_HEAD(msg->stream_data_list) == stream_data)
if (old_body_buffer == DEQ_PREV(stream_data->section.buffer))
stream_data->free_prev = true;
result = is_footer ? QD_MESSAGE_STREAM_DATA_FOOTER_OK : QD_MESSAGE_STREAM_DATA_BODY_OK;
break;
case QD_SECTION_NEED_MORE:
result = IS_ATOMIC_FLAG_SET(&msg->content->receive_complete) ?
QD_MESSAGE_STREAM_DATA_NO_MORE : QD_MESSAGE_STREAM_DATA_INCOMPLETE;
break;
}
UNLOCK(content->lock);
return result;
}
qd_parsed_field_t *qd_message_get_ingress_router(qd_message_t *msg)
{
return ((qd_message_pvt_t*) msg)->content->ma_pf_ingress;
}
// used by exchange bindings to erase original ingress node id
void qd_message_reset_ingress_router_annotation(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
msg->ma_reset_ingress = true;
}
void qd_message_disable_ingress_router_annotation(qd_message_t *msg)
{
((qd_message_pvt_t*) msg)->ma_filter_ingress = true;
}
qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg)
{
return ((qd_message_pvt_t*)msg)->content->ma_pf_to_override;
}
qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg)
{
return ((qd_message_pvt_t*) msg)->content->ma_pf_trace;
}
void qd_message_disable_trace_annotation(qd_message_t *msg)
{
((qd_message_pvt_t*) msg)->ma_filter_trace = true;
}
// used by exchange bindings to erase old message trace list
void qd_message_reset_trace_annotation(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
msg->ma_reset_trace = true;
}
int qd_message_is_streaming(const qd_message_t *msg)
{
const qd_message_pvt_t *msg_pvt = (const qd_message_pvt_t *)msg;
return msg_pvt->ma_streaming;
}
void qd_message_Q2_holdoff_disable(qd_message_t *msg)
{
if (!msg)
return;
qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg;
qd_message_content_t *content = msg_pvt->content;
qd_message_q2_unblocker_t q2_unblock = {0};
LOCK(content->lock);
if (!msg_pvt->content->disable_q2_holdoff) {
msg_pvt->content->disable_q2_holdoff = true;
if (content->q2_input_holdoff) {
content->q2_input_holdoff = false;
q2_unblock = content->q2_unblocker;
}
}
UNLOCK(content->lock);
if (q2_unblock.handler)
q2_unblock.handler(q2_unblock.context);
}
bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content)
{
const size_t buff_ct = DEQ_SIZE(content->buffers);
assert(buff_ct >= content->protected_buffers);
return !content->disable_q2_holdoff && (buff_ct - content->protected_buffers) >= QD_QLIMIT_Q2_UPPER;
}
bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content)
{
const size_t buff_ct = DEQ_SIZE(content->buffers);
assert(buff_ct >= content->protected_buffers);
return content->disable_q2_holdoff || (buff_ct - content->protected_buffers) < QD_QLIMIT_Q2_LOWER;
}
bool qd_message_is_Q2_blocked(const qd_message_t *msg)
{
qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg;
qd_message_content_t *content = msg_pvt->content;
bool blocked;
LOCK(content->lock);
blocked = content->q2_input_holdoff;
UNLOCK(content->lock);
return blocked;
}
bool qd_message_aborted(const qd_message_t *msg)
{
assert(msg);
qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
return IS_ATOMIC_FLAG_SET(&msg_pvt->content->aborted);
}
void qd_message_set_aborted(const qd_message_t *msg)
{
if (!msg)
return;
qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
SET_ATOMIC_FLAG(&msg_pvt->content->aborted);
}
bool qd_message_oversize(const qd_message_t *msg)
{
qd_message_content_t * mc = MSG_CONTENT(msg);
return IS_ATOMIC_FLAG_SET(&mc->oversize);
}
int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props)
{
qd_composed_field_t *field = 0;
int rc = 0;
field = qd_compose(QD_PERFORMATIVE_FOOTER, field);
// Stick the buffers into the footer compose field.
qd_compose_insert_binary_buffers(field, footer_props);
rc = qd_message_extend(message, field, 0);
qd_compose_free(field);
return rc;
}
int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked)
{
unsigned int length = DEQ_SIZE(*data);
qd_composed_field_t *field = 0;
int rc = 0;
if (q2_blocked)
*q2_blocked = false;
// DISPATCH-1803: ensure no body data section can exceed the
// QD_QLIMIT_Q2_LOWER. This allows the egress router to wait for an entire
// body data section to arrive and be validated before sending it out to
// the endpoint without preventing Q2 from being relieved (DISPATCH-2191).
//
const size_t buf_limit = QD_QLIMIT_Q2_LOWER - 2; // reserve 1 extra for performative header
assert(buf_limit);
while (length > buf_limit) {
qd_buffer_t *buf = DEQ_HEAD(*data);
for (int i = 0; i < buf_limit; ++i) {
buf = DEQ_NEXT(buf);
}
// split the list at buf. buf becomes head of trailing list
qd_buffer_list_t trailer = DEQ_EMPTY;
DEQ_HEAD(trailer) = buf;
DEQ_TAIL(trailer) = DEQ_TAIL(*data);
DEQ_TAIL(*data) = DEQ_PREV(buf);
DEQ_NEXT(DEQ_TAIL(*data)) = 0;
DEQ_PREV(buf) = 0;
DEQ_SIZE(trailer) = length - buf_limit;
DEQ_SIZE(*data) = buf_limit;
field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
qd_compose_insert_binary_buffers(field, data);
DEQ_MOVE(trailer, *data);
length -= buf_limit;
}
field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
qd_compose_insert_binary_buffers(field, data);
rc = qd_message_extend(message, field, q2_blocked);
qd_compose_free(field);
return rc;
}
void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
qd_message_q2_unblocked_handler_t callback,
qd_alloc_safe_ptr_t context)
{
qd_message_content_t *content = MSG_CONTENT(msg);
LOCK(content->lock);
content->q2_unblocker.handler = callback;
content->q2_unblocker.context = context;
UNLOCK(content->lock);
}
void qd_message_clear_q2_unblocked_handler(qd_message_t *msg)
{
if (msg) {
qd_message_content_t *content = MSG_CONTENT(msg);
LOCK(content->lock);
content->q2_unblocker.handler = 0;
qd_nullify_safe_ptr(&content->q2_unblocker.context);
UNLOCK(content->lock);
}
}