| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "qpid/dispatch/message.h" |
| |
| #include "aprintf.h" |
| #include "compose_private.h" |
| #include "connection_manager_private.h" |
| #include "message_private.h" |
| #include "policy.h" |
| #include "buffer_field_api.h" |
| |
| #include "qpid/dispatch/amqp.h" |
| #include "qpid/dispatch/ctools.h" |
| #include "qpid/dispatch/error.h" |
| #include "qpid/dispatch/iterator.h" |
| #include "qpid/dispatch/log.h" |
| #include "qpid/dispatch/threading.h" |
| |
| #include <proton/object.h> |
| |
| #include <assert.h> |
| #include <ctype.h> |
| #include <inttypes.h> |
| #include <stdio.h> |
| #include <string.h> |
| #include <time.h> |
| |
| |
| #define LOCK sys_mutex_lock |
| #define UNLOCK sys_mutex_unlock |
| |
| const char *STR_AMQP_NULL = "null"; |
| const char *STR_AMQP_TRUE = "T"; |
| const char *STR_AMQP_FALSE = "F"; |
| |
| static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"; |
| static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70"; |
| static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"; |
| static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71"; |
| static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"; |
| static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72"; |
| static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"; |
| static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73"; |
| static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"; |
| static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74"; |
| static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"; |
| static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75"; |
| static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"; |
| static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76"; |
| static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77"; |
| static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77"; |
| static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"; |
| static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78"; |
| static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0"; |
| static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1"; |
| static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0"; |
| static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0" |
| "\xa1\xb1\xa3\xb3\xe0\xf0" |
| "\x40\x56\x41\x42\x50\x60\x70\x52\x43\x80\x53\x44\x51\x61\x71\x54\x81\x55\x72\x82\x74\x84\x94\x73\x83\x98"; |
| |
| |
| static const char * const section_names[QD_DEPTH_ALL + 1] = { |
| [QD_DEPTH_NONE] = "none", |
| [QD_DEPTH_HEADER] = "header", |
| [QD_DEPTH_DELIVERY_ANNOTATIONS] = "delivery annotations", |
| [QD_DEPTH_MESSAGE_ANNOTATIONS] = "message annotations", |
| [QD_DEPTH_PROPERTIES] = "properties", |
| [QD_DEPTH_APPLICATION_PROPERTIES] = "application properties", |
| [QD_DEPTH_BODY] = "body", |
| [QD_DEPTH_ALL] = "footer" |
| }; |
| |
| PN_HANDLE(PN_DELIVERY_CTX) |
| |
| ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0); |
| ALLOC_DEFINE(qd_message_content_t); |
| ALLOC_DEFINE(qd_message_stream_data_t); |
| |
| typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length); |
| |
| qd_log_source_t* log_source = 0; |
| |
| qd_log_source_t* qd_message_log_source() |
| { |
| if(log_source) |
| return log_source; |
| else { |
| qd_message_initialize(); |
| return log_source; |
| } |
| } |
| |
| void qd_message_initialize() { |
| log_source = qd_log_source("MESSAGE"); |
| } |
| |
| int qd_message_repr_len() { return qd_log_max_len(); } |
| |
| /** |
| * Quote non-printable characters suitable for log messages. Output in buffer. |
| */ |
| static void quote(char* bytes, int n, char **begin, char *end) { |
| for (char* p = bytes; p < bytes+n; ++p) { |
| if (isprint(*p) || isspace(*p)) |
| aprintf(begin, end, "%c", (int)*p); |
| else |
| aprintf(begin, end, "\\%02hhx", *p); |
| } |
| } |
| |
| /** |
| * Populates the buffer with formatted epoch_time |
| */ |
| static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len) |
| { |
| struct timeval local_timeval; |
| local_timeval.tv_sec = epoch_time/1000; |
| local_timeval.tv_usec = (epoch_time%1000) * 1000; |
| |
| time_t local_time_t; |
| local_time_t = local_timeval.tv_sec; |
| |
| struct tm *local_tm; |
| char fmt[100]; |
| local_tm = localtime(&local_time_t); |
| |
| if (local_tm != NULL) { |
| strftime(fmt, sizeof fmt, format, local_tm); |
| snprintf(buffer, len, fmt, local_timeval.tv_usec / 1000); |
| } |
| } |
| |
| /** |
| * Print the bytes of a parsed_field as characters, with pre/post quotes. |
| */ |
| static void print_parsed_field_string(qd_parsed_field_t *parsed_field, |
| const char *pre, const char *post, |
| char **begin, char *end) { |
| qd_iterator_t *i = qd_parse_raw(parsed_field); |
| if (i) { |
| aprintf(begin, end, "%s", pre); |
| while (end - *begin > 1 && !qd_iterator_end(i)) { |
| char c = qd_iterator_octet(i); |
| quote(&c, 1, begin, end); |
| } |
| aprintf(begin, end, "%s", post); |
| } |
| } |
| |
| /** |
| * Tries to print the string representation of the parsed field content based on |
| * the tag of the parsed field. Some tag types have not been dealt with. Add |
| * code as and when required. |
| */ |
| static void print_parsed_field(qd_parsed_field_t *parsed_field, char **begin, char *end) |
| { |
| uint8_t tag = qd_parse_tag(parsed_field); |
| switch (tag) { |
| case QD_AMQP_NULL: |
| aprintf(begin, end, "%s", STR_AMQP_NULL); |
| break; |
| |
| case QD_AMQP_BOOLEAN: |
| case QD_AMQP_TRUE: |
| case QD_AMQP_FALSE: |
| aprintf(begin, end, "%s", qd_parse_as_uint(parsed_field) ? STR_AMQP_TRUE: STR_AMQP_FALSE); |
| break; |
| |
| case QD_AMQP_BYTE: |
| case QD_AMQP_SHORT: |
| case QD_AMQP_INT: |
| case QD_AMQP_SMALLINT: { |
| char str[11]; |
| int32_t int32_val = qd_parse_as_int(parsed_field); |
| snprintf(str, 10, "%"PRId32"", int32_val); |
| aprintf(begin, end, "%s", str); |
| break; |
| } |
| |
| case QD_AMQP_UBYTE: |
| case QD_AMQP_USHORT: |
| case QD_AMQP_UINT: |
| case QD_AMQP_SMALLUINT: |
| case QD_AMQP_UINT0: { |
| char str[11]; |
| uint32_t uint32_val = qd_parse_as_uint(parsed_field); |
| snprintf(str, 11, "%"PRIu32"", uint32_val); |
| aprintf(begin, end, "%s", str); |
| break; |
| } |
| case QD_AMQP_ULONG: |
| case QD_AMQP_SMALLULONG: |
| case QD_AMQP_ULONG0: { |
| char str[21]; |
| uint64_t uint64_val = qd_parse_as_ulong(parsed_field); |
| snprintf(str, 20, "%"PRIu64"", uint64_val); |
| aprintf(begin, end, "%s", str); |
| break; |
| } |
| case QD_AMQP_TIMESTAMP: { |
| char creation_time[100]; //string representation of creation time. |
| pn_timestamp_t creation_timestamp = qd_parse_as_ulong(parsed_field); |
| if (creation_timestamp > 0) { |
| format_time(creation_timestamp, "%Y-%m-%d %H:%M:%S.%%03lu %z", creation_time, 100); |
| aprintf(begin, end, "\"%s\"", creation_time); |
| } |
| break; |
| } |
| case QD_AMQP_LONG: |
| case QD_AMQP_SMALLLONG: { |
| char str[21]; |
| int64_t int64_val = qd_parse_as_long(parsed_field); |
| snprintf(str, 20, "%"PRId64"", int64_val); |
| aprintf(begin, end, "%s", str); |
| break; |
| } |
| case QD_AMQP_FLOAT: |
| case QD_AMQP_DOUBLE: |
| case QD_AMQP_DECIMAL32: |
| case QD_AMQP_DECIMAL64: |
| case QD_AMQP_DECIMAL128: |
| case QD_AMQP_UTF32: |
| case QD_AMQP_UUID: |
| break; //TODO |
| |
| case QD_AMQP_VBIN8: |
| case QD_AMQP_VBIN32: |
| print_parsed_field_string(parsed_field, "b\"", "\"", begin, end); |
| break; |
| |
| case QD_AMQP_STR8_UTF8: |
| case QD_AMQP_STR32_UTF8: |
| print_parsed_field_string(parsed_field, "\"", "\"", begin, end); |
| break; |
| |
| case QD_AMQP_SYM8: |
| case QD_AMQP_SYM32: |
| print_parsed_field_string(parsed_field, ":\"", "\"", begin, end); |
| break; |
| |
| case QD_AMQP_MAP8: |
| case QD_AMQP_MAP32: { |
| uint32_t count = qd_parse_sub_count(parsed_field); |
| if (count > 0) { |
| aprintf(begin, end, "%s", "{"); |
| } |
| for (uint32_t idx = 0; idx < count; idx++) { |
| qd_parsed_field_t *sub_key = qd_parse_sub_key(parsed_field, idx); |
| // The keys of this map are restricted to be of type string |
| // (which excludes the possibility of a null key) |
| print_parsed_field(sub_key, begin, end); |
| |
| aprintf(begin, end, "%s", "="); |
| |
| qd_parsed_field_t *sub_value = qd_parse_sub_value(parsed_field, idx); |
| |
| print_parsed_field(sub_value, begin, end); |
| |
| if ((idx + 1) < count) |
| aprintf(begin, end, "%s", ", "); |
| } |
| if (count > 0) { |
| aprintf(begin, end, "%s", "}"); |
| } |
| break; |
| } |
| case QD_AMQP_LIST0: |
| case QD_AMQP_LIST8: |
| case QD_AMQP_LIST32: { |
| uint32_t count = qd_parse_sub_count(parsed_field); |
| if (count > 0) { |
| aprintf(begin, end, "%s", "["); |
| } |
| for (uint32_t idx = 0; idx < count; idx++) { |
| qd_parsed_field_t *sub_value = qd_parse_sub_value(parsed_field, idx); |
| print_parsed_field(sub_value, begin, end); |
| if ((idx + 1) < count) |
| aprintf(begin, end, "%s", ", "); |
| } |
| |
| if (count > 0) { |
| aprintf(begin, end, "%s", "]"); |
| } |
| |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| |
| /* Print field if enabled by log bits, leading comma if !*first */ |
| static void print_field( |
| qd_message_t *msg, int field, const char *name, |
| qd_log_bits flags, bool *first, char **begin, char *end) |
| { |
| if (is_log_component_enabled(flags, name)) { |
| qd_iterator_t* iter = (field == QD_FIELD_APPLICATION_PROPERTIES) ? |
| qd_message_field_iterator(msg, field) : |
| qd_message_field_iterator_typed(msg, field); |
| if (iter) { |
| qd_parsed_field_t *parsed_field = qd_parse(iter); |
| if (qd_parse_ok(parsed_field)) { |
| if (*first) { |
| *first = false; |
| aprintf(begin, end, "%s=", name); |
| } else { |
| aprintf(begin, end, ", %s=", name); |
| } |
| print_parsed_field(parsed_field, begin, end); |
| } |
| qd_parse_free(parsed_field); |
| qd_iterator_free(iter); |
| } |
| } |
| } |
| |
| static const char REPR_END[] = "}\0"; |
| |
| char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits flags) { |
| if (flags == 0 |
| || qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) != QD_MESSAGE_DEPTH_OK |
| || !((qd_message_pvt_t *)msg)->content->section_application_properties.parsed) { |
| return NULL; |
| } |
| char *begin = buffer; |
| char *end = buffer + len - sizeof(REPR_END); /* Save space for ending */ |
| bool first = true; |
| aprintf(&begin, end, "Message{", msg); |
| print_field(msg, QD_FIELD_MESSAGE_ID, "message-id", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_USER_ID, "user-id", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_TO, "to", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_SUBJECT, "subject", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_REPLY_TO, "reply-to", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_CORRELATION_ID, "correlation-id", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_CONTENT_TYPE, "content-type", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_CONTENT_ENCODING, "content-encoding", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_ABSOLUTE_EXPIRY_TIME, "absolute-expiry-time", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_CREATION_TIME, "creation-time", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_GROUP_ID, "group-id", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_GROUP_SEQUENCE, "group-sequence", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_REPLY_TO_GROUP_ID, "reply-to-group-id", flags, &first, &begin, end); |
| print_field(msg, QD_FIELD_APPLICATION_PROPERTIES, "app-properties", flags, &first, &begin, end); |
| |
| aprintf(&begin, end, "%s", REPR_END); /* We saved space at the beginning. */ |
| return buffer; |
| } |
| |
| |
| /** |
| * Return true if there is at least one consumable octet in the buffer chain |
| * starting at *cursor. If the cursor is beyond the end of the buffer, and there |
| * is another buffer in the chain, move the cursor and buffer pointers to reference |
| * the first octet in the next buffer. Note that this movement does NOT constitute |
| * advancement of the cursor in the buffer chain. |
| */ |
| static bool can_advance(unsigned char **cursor, qd_buffer_t **buffer) |
| { |
| if (qd_buffer_cursor(*buffer) > *cursor) |
| return true; |
| |
| if (DEQ_NEXT(*buffer)) { |
| *buffer = DEQ_NEXT(*buffer); |
| *cursor = qd_buffer_base(*buffer); |
| } |
| |
| return qd_buffer_cursor(*buffer) > *cursor; |
| } |
| |
| |
| /** |
| * Advance cursor through buffer chain by 'consume' bytes. |
| * Cursor and buffer args are advanced to point to new position in buffer chain. |
| * - if the number of bytes in the buffer chain is less than or equal to |
| * the consume number then return false |
| * - the original buffer chain is not changed or freed. |
| * |
| * @param cursor Pointer into current buffer content |
| * @param buffer pointer to current buffer |
| * @param consume number of bytes to advance |
| * @return true if all bytes consumed, false if not enough bytes available |
| */ |
| static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) |
| { |
| if (!can_advance(cursor, buffer)) |
| return false; |
| |
| unsigned char *local_cursor = *cursor; |
| qd_buffer_t *local_buffer = *buffer; |
| |
| int remaining = qd_buffer_cursor(local_buffer) - local_cursor; |
| while (consume > 0) { |
| if (consume <= remaining) { |
| local_cursor += consume; |
| consume = 0; |
| } else { |
| if (!local_buffer->next) |
| return false; |
| |
| consume -= remaining; |
| local_buffer = local_buffer->next; |
| local_cursor = qd_buffer_base(local_buffer); |
| remaining = qd_buffer_size(local_buffer); |
| } |
| } |
| |
| *cursor = local_cursor; |
| *buffer = local_buffer; |
| |
| return true; |
| } |
| |
| |
| /** |
| * Advance cursor through buffer chain by 'consume' bytes. |
| * Cursor and buffer args are advanced to point to new position in buffer chain. |
| * Buffer content that is consumed is optionally passed to handler. |
| * - if the number of bytes in the buffer chain is less than or equal to |
| * the consume number then return the last buffer in the chain |
| * and a cursor pointing to the first unused byte in the buffer. |
| * - if the number of bytes in the buffer chain is greater than the consume |
| * number the returned buffer/cursor will point to the next available |
| * octet of data. |
| * - the original buffer chain is not changed or freed. |
| * |
| * @param cursor pointer into current buffer content |
| * @param buffer pointer to current buffer |
| * @param consume number of bytes to advance |
| * @param handler pointer to processor function |
| * @param context opaque argument for handler |
| */ |
| static void advance_guarded(const uint8_t **cursor, qd_buffer_t **buffer, int consume, buffer_process_t handler, void *context) |
| { |
| const uint8_t *local_cursor = *cursor; |
| qd_buffer_t *local_buffer = *buffer; |
| |
| int remaining = qd_buffer_cursor(local_buffer) - local_cursor; |
| while (consume > 0) { |
| if (consume < remaining) { |
| if (handler) |
| handler(context, local_cursor, consume); |
| local_cursor += consume; |
| consume = 0; |
| } else { |
| if (handler) |
| handler(context, local_cursor, remaining); |
| consume -= remaining; |
| if (!DEQ_NEXT(local_buffer)) { |
| local_cursor = qd_buffer_cursor(local_buffer); |
| break; |
| } |
| local_buffer = DEQ_NEXT(local_buffer); |
| local_cursor = qd_buffer_base(local_buffer); |
| remaining = qd_buffer_size(local_buffer); |
| } |
| } |
| |
| *cursor = local_cursor; |
| *buffer = local_buffer; |
| } |
| |
| |
| /** |
| * If there is an octet to be consumed, put it in octet and return true, else return false. |
| */ |
| static bool next_octet(unsigned char **cursor, qd_buffer_t **buffer, unsigned char *octet) |
| { |
| if (can_advance(cursor, buffer)) { |
| *octet = **cursor; |
| advance(cursor, buffer, 1); |
| return true; |
| } |
| return false; |
| } |
| |
| |
| static bool traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field) |
| { |
| qd_buffer_t *start_buffer = *buffer; |
| unsigned char *start_cursor = *cursor; |
| unsigned char tag; |
| unsigned char octet; |
| |
| if (!next_octet(cursor, buffer, &tag)) |
| return false; |
| |
| int consume = 0; |
| size_t hdr_length = 1; |
| |
| switch (tag & 0xF0) { |
| case 0x40 : |
| consume = 0; |
| break; |
| case 0x50 : |
| consume = 1; |
| break; |
| case 0x60 : |
| consume = 2; |
| break; |
| case 0x70 : |
| consume = 4; |
| break; |
| case 0x80 : |
| consume = 8; |
| break; |
| case 0x90 : |
| consume = 16; |
| break; |
| |
| case 0xB0 : |
| case 0xD0 : |
| case 0xF0 : |
| hdr_length += 3; |
| if (!next_octet(cursor, buffer, &octet)) |
| return false; |
| consume |= ((int) octet) << 24; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return false; |
| consume |= ((int) octet) << 16; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return false; |
| consume |= ((int) octet) << 8; |
| |
| // Fall through to the next case... |
| |
| case 0xA0 : |
| case 0xC0 : |
| case 0xE0 : |
| hdr_length++; |
| if (!next_octet(cursor, buffer, &octet)) |
| return false; |
| consume |= (int) octet; |
| break; |
| } |
| |
| if (!advance(cursor, buffer, consume)) |
| return false; |
| |
| if (field && !field->parsed) { |
| field->buffer = start_buffer; |
| field->offset = start_cursor - qd_buffer_base(start_buffer); |
| field->length = consume; |
| field->hdr_length = hdr_length; |
| field->parsed = true; |
| field->tag = tag; |
| } |
| |
| return true; |
| } |
| |
| |
| static int get_list_count(unsigned char **cursor, qd_buffer_t **buffer) |
| { |
| unsigned char tag; |
| unsigned char octet; |
| |
| if (!next_octet(cursor, buffer, &tag)) |
| return 0; |
| |
| int count = 0; |
| |
| switch (tag) { |
| case 0x45 : // list0 |
| break; |
| case 0xd0 : // list32 |
| // |
| // Advance past the list length |
| // |
| if (!advance(cursor, buffer, 4)) |
| return 0; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return 0; |
| count |= ((int) octet) << 24; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return 0; |
| count |= ((int) octet) << 16; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return 0; |
| count |= ((int) octet) << 8; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return 0; |
| count |= (int) octet; |
| |
| break; |
| |
| case 0xc0 : // list8 |
| // |
| // Advance past the list length |
| // |
| if (!advance(cursor, buffer, 1)) |
| return 0; |
| |
| if (!next_octet(cursor, buffer, &octet)) |
| return 0; |
| count |= (int) octet; |
| break; |
| } |
| |
| return count; |
| } |
| |
| |
| // Validate a message section (header, body, etc). This determines whether or |
| // not a given section is present and complete at the start of the buffer chain. |
| // |
| // The section is identified by a 'pattern' (a descriptor identifier, such as |
| // "MESSAGE_ANNOTATION_LONG" above). The descriptor also provides a type |
| // 'tag', which MUST match else the section is invalid. |
| // |
| // Non-Body message sections are optional. So if the pattern does NOT match |
| // then the section that the pattern represents is not present. Whether or not |
| // this is acceptable is left to the caller. |
| // |
| // If the pattern and tag match, extract the length and verify that the entire |
| // section is present in the buffer chain. If this is the case then store the |
| // start of the section in 'location' and advance '*buffer' and '*cursor' to |
| // the next section. |
| // |
| // if there is not enough of the section present in the buffer chain we need to |
| // wait until more data arrives and try again. |
| // |
| // |
| typedef enum { |
| QD_SECTION_INVALID, // invalid section (tag mismatch, duplicate section, etc). |
| QD_SECTION_MATCH, |
| QD_SECTION_NO_MATCH, |
| QD_SECTION_NEED_MORE // not enough data in the buffer chain - try again |
| } qd_section_status_t; |
| |
| static qd_section_status_t message_section_check_LH(qd_message_content_t *content, |
| qd_buffer_t **buffer, |
| unsigned char **cursor, |
| const unsigned char *pattern, |
| int pattern_length, |
| const unsigned char *expected_tags, |
| qd_field_location_t *location, |
| bool dup_ok, |
| bool protect_buffer) |
| { |
| if (!*cursor || !can_advance(cursor, buffer)) |
| return QD_SECTION_NEED_MORE; |
| |
| qd_buffer_t *test_buffer = *buffer; |
| unsigned char *test_cursor = *cursor; |
| unsigned char *end_of_buffer = qd_buffer_cursor(test_buffer); |
| int idx = 0; |
| |
| while (idx < pattern_length && *test_cursor == pattern[idx]) { |
| idx++; |
| test_cursor++; |
| if (test_cursor == end_of_buffer) { |
| test_buffer = test_buffer->next; |
| if (test_buffer == 0) |
| return QD_SECTION_NEED_MORE; |
| test_cursor = qd_buffer_base(test_buffer); |
| end_of_buffer = test_cursor + qd_buffer_size(test_buffer); |
| } |
| } |
| |
| if (idx < pattern_length) |
| return QD_SECTION_NO_MATCH; |
| |
| // |
| // Pattern matched, check the tag |
| // |
| while (*expected_tags && *test_cursor != *expected_tags) |
| expected_tags++; |
| if (*expected_tags == 0) |
| return QD_SECTION_INVALID; // Error: Unexpected tag |
| |
| if (location->parsed && !dup_ok) |
| return QD_SECTION_INVALID; // Error: Duplicate section |
| |
| // |
| // Pattern matched and tag is expected. Mark the beginning of the section. |
| // |
| location->buffer = *buffer; |
| location->offset = *cursor - qd_buffer_base(*buffer); |
| location->length = 0; |
| location->hdr_length = pattern_length; |
| |
| // |
| // Check that the full section is present, if so advance the pointers to |
| // consume the whole section. |
| // |
| int pre_consume = 1; // Count the already extracted tag |
| uint32_t consume = 0; |
| unsigned char tag; |
| unsigned char octet; |
| |
| if (!next_octet(&test_cursor, &test_buffer, &tag)) |
| return QD_SECTION_NEED_MORE; |
| |
| unsigned char tag_subcat = tag & 0xF0; |
| |
| // if there is no more data the only valid data type is a null type (0x40), |
| // size is implied as 0 |
| if (!can_advance(&test_cursor, &test_buffer) && tag_subcat != 0x40) |
| return QD_SECTION_NEED_MORE; |
| |
| switch (tag_subcat) { |
| // fixed sizes: |
| case 0x40: /* null */ break; |
| case 0x50: consume = 1; break; |
| case 0x60: consume = 2; break; |
| case 0x70: consume = 4; break; |
| case 0x80: consume = 8; break; |
| case 0x90: consume = 16; break; |
| |
| case 0xB0: |
| case 0xD0: |
| case 0xF0: |
| // uint32_t size field: |
| pre_consume += 3; |
| if (!next_octet(&test_cursor, &test_buffer, &octet)) |
| return QD_SECTION_NEED_MORE; |
| consume |= ((uint32_t) octet) << 24; |
| |
| if (!next_octet(&test_cursor, &test_buffer, &octet)) |
| return QD_SECTION_NEED_MORE; |
| consume |= ((uint32_t) octet) << 16; |
| |
| if (!next_octet(&test_cursor, &test_buffer, &octet)) |
| return QD_SECTION_NEED_MORE; |
| consume |= ((uint32_t) octet) << 8; |
| |
| // Fall through to the next case... |
| |
| case 0xA0: |
| case 0xC0: |
| case 0xE0: |
| // uint8_t size field |
| pre_consume += 1; |
| if (!next_octet(&test_cursor, &test_buffer, &octet)) |
| return QD_SECTION_NEED_MORE; |
| consume |= (uint32_t) octet; |
| break; |
| } |
| |
| location->length = pre_consume + consume; |
| if (consume) { |
| if (!advance(&test_cursor, &test_buffer, consume)) { |
| return QD_SECTION_NEED_MORE; // whole section not fully received |
| } |
| } |
| |
| if (protect_buffer) { |
| // |
| // increment the reference count of the parsed section as location now |
| // references it. Note that the cursor may have advanced to the octet after |
| // the parsed section, so be careful not to include an extra buffer past |
| // the end. And cursor + buffer will be null if the parsed section ends at |
| // the end of the buffer chain, so be careful of that, too! |
| // |
| bool buffers_protected = false; |
| qd_buffer_t *start = *buffer; |
| qd_buffer_t *last = test_buffer; |
| if (last && last != start) { |
| if (test_cursor == qd_buffer_base(last)) { |
| // last does not include octets for the current section |
| last = DEQ_PREV(last); |
| } |
| } |
| |
| while (start) { |
| qd_buffer_inc_fanout(start); |
| buffers_protected = true; |
| if (start == last) |
| break; |
| start = DEQ_NEXT(start); |
| } |
| |
| // DISPATCH-2191: protected buffers are never released - even after |
| // being sent - because they are referenced by the content->section_xxx |
| // location fields and remain valid for the life of the content |
| // instance. Since these buffers are never freed they must not be |
| // included in the Q2 threshold check! |
| if (buffers_protected) { |
| content->protected_buffers = 0; |
| start = DEQ_HEAD(content->buffers); |
| while (start) { |
| ++content->protected_buffers; |
| if (start == last) |
| break; |
| start = DEQ_NEXT(start); |
| } |
| } |
| } |
| |
| location->parsed = 1; |
| |
| *cursor = test_cursor; |
| *buffer = test_buffer; |
| return QD_SECTION_MATCH; |
| } |
| |
| |
| // translate a field into its proper section of the message |
| static qd_message_field_t qd_field_section(qd_message_field_t field) |
| { |
| switch (field) { |
| |
| case QD_FIELD_HEADER: |
| case QD_FIELD_DELIVERY_ANNOTATION: |
| case QD_FIELD_MESSAGE_ANNOTATION: |
| case QD_FIELD_PROPERTIES: |
| case QD_FIELD_APPLICATION_PROPERTIES: |
| case QD_FIELD_BODY: |
| case QD_FIELD_FOOTER: |
| return field; |
| |
| case QD_FIELD_DURABLE: |
| case QD_FIELD_PRIORITY: |
| case QD_FIELD_TTL: |
| case QD_FIELD_FIRST_ACQUIRER: |
| case QD_FIELD_DELIVERY_COUNT: |
| return QD_FIELD_HEADER; |
| |
| case QD_FIELD_MESSAGE_ID: |
| case QD_FIELD_USER_ID: |
| case QD_FIELD_TO: |
| case QD_FIELD_SUBJECT: |
| case QD_FIELD_REPLY_TO: |
| case QD_FIELD_CORRELATION_ID: |
| case QD_FIELD_CONTENT_TYPE: |
| case QD_FIELD_CONTENT_ENCODING: |
| case QD_FIELD_ABSOLUTE_EXPIRY_TIME: |
| case QD_FIELD_CREATION_TIME: |
| case QD_FIELD_GROUP_ID: |
| case QD_FIELD_GROUP_SEQUENCE: |
| case QD_FIELD_REPLY_TO_GROUP_ID: |
| return QD_FIELD_PROPERTIES; |
| |
| default: |
| assert(false); // TBD: add new fields here |
| return QD_FIELD_NONE; |
| } |
| } |
| |
| |
| // get the field location of a field in the message properties (if it exists, |
| // else 0). |
| static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_message_field_t field) |
| { |
| static const intptr_t offsets[] = { |
| // position of the field's qd_field_location_t in the message content |
| // object |
| (intptr_t) &((qd_message_content_t*) 0)->field_message_id, |
| (intptr_t) &((qd_message_content_t*) 0)->field_user_id, |
| (intptr_t) &((qd_message_content_t*) 0)->field_to, |
| (intptr_t) &((qd_message_content_t*) 0)->field_subject, |
| (intptr_t) &((qd_message_content_t*) 0)->field_reply_to, |
| (intptr_t) &((qd_message_content_t*) 0)->field_correlation_id, |
| (intptr_t) &((qd_message_content_t*) 0)->field_content_type, |
| (intptr_t) &((qd_message_content_t*) 0)->field_content_encoding, |
| (intptr_t) &((qd_message_content_t*) 0)->field_absolute_expiry_time, |
| (intptr_t) &((qd_message_content_t*) 0)->field_creation_time, |
| (intptr_t) &((qd_message_content_t*) 0)->field_group_id, |
| (intptr_t) &((qd_message_content_t*) 0)->field_group_sequence, |
| (intptr_t) &((qd_message_content_t*) 0)->field_reply_to_group_id |
| }; |
| // update table above if new fields need to be accessed: |
| assert(QD_FIELD_MESSAGE_ID <= field && field <= QD_FIELD_REPLY_TO_GROUP_ID); |
| |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| if (!content->section_message_properties.parsed) { |
| if (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) != QD_MESSAGE_DEPTH_OK || !content->section_message_properties.parsed) |
| return 0; |
| } |
| |
| const int index = field - QD_FIELD_MESSAGE_ID; |
| qd_field_location_t *const location = (qd_field_location_t*) ((char*) content + offsets[index]); |
| if (location->parsed) |
| return location; |
| |
| // requested field not parsed out. Need to parse out up to the requested field: |
| qd_buffer_t *buffer = content->section_message_properties.buffer; |
| unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset; |
| if (!advance(&cursor, &buffer, content->section_message_properties.hdr_length)) |
| return 0; |
| if (index >= get_list_count(&cursor, &buffer)) |
| return 0; // properties list too short |
| |
| int position = 0; |
| while (position < index) { |
| qd_field_location_t *f = (qd_field_location_t*) ((char*) content + offsets[position]); |
| if (f->parsed) { |
| if (!advance(&cursor, &buffer, f->hdr_length + f->length)) |
| return 0; |
| } else // parse it out |
| if (!traverse_field(&cursor, &buffer, f)) |
| return 0; |
| position++; |
| } |
| |
| // all fields previous to the target have now been parsed and cursor/buffer |
| // are in the correct position, parse out the field: |
| if (traverse_field(&cursor, &buffer, location)) |
| return location; |
| |
| return 0; |
| } |
| |
| |
| static void qd_message_parse_priority(qd_message_t *in_msg) |
| { |
| qd_message_content_t *content = MSG_CONTENT(in_msg); |
| qd_iterator_t *iter = qd_message_field_iterator(in_msg, QD_FIELD_HEADER); |
| |
| SET_ATOMIC_FLAG(&content->priority_parsed); |
| |
| if (!!iter) { |
| qd_parsed_field_t *field = qd_parse(iter); |
| if (qd_parse_ok(field)) { |
| if (qd_parse_is_list(field) && qd_parse_sub_count(field) >= 2) { |
| qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 1); |
| if (qd_parse_tag(priority_field) != QD_AMQP_NULL) { |
| uint32_t value = qd_parse_as_uint(priority_field); |
| value = MIN(value, QDR_MAX_PRIORITY); |
| sys_atomic_set(&content->priority, value); |
| } |
| } |
| } |
| qd_parse_free(field); |
| qd_iterator_free(iter); |
| } |
| } |
| |
| |
| // Get the field's location in the buffer. Return 0 if the field does not exist. |
| // Note that even if the field location is returned, it may contain a |
| // QD_AMQP_NULL value (qd_field_location->tag == QD_AMQP_NULL). |
| // |
| static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_message_field_t field) |
| { |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| qd_message_field_t section = qd_field_section(field); |
| |
| switch (section) { |
| case QD_FIELD_HEADER: |
| if (content->section_message_header.parsed || |
| (qd_message_check_depth(msg, QD_DEPTH_HEADER) == QD_MESSAGE_DEPTH_OK && content->section_message_header.parsed)) |
| return &content->section_message_header; |
| break; |
| |
| case QD_FIELD_PROPERTIES: |
| return qd_message_properties_field(msg, field); |
| |
| case QD_FIELD_DELIVERY_ANNOTATION: |
| if (content->section_delivery_annotation.parsed || |
| (qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS) == QD_MESSAGE_DEPTH_OK && content->section_delivery_annotation.parsed)) |
| return &content->section_delivery_annotation; |
| break; |
| |
| case QD_FIELD_MESSAGE_ANNOTATION: |
| if (content->section_message_annotation.parsed || |
| (qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) == QD_MESSAGE_DEPTH_OK && content->section_message_annotation.parsed)) |
| return &content->section_message_annotation; |
| break; |
| |
| case QD_FIELD_APPLICATION_PROPERTIES: |
| if (content->section_application_properties.parsed || |
| (qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) == QD_MESSAGE_DEPTH_OK && content->section_application_properties.parsed)) |
| return &content->section_application_properties; |
| break; |
| |
| case QD_FIELD_BODY: |
| if (content->section_body.parsed || |
| (qd_message_check_depth(msg, QD_DEPTH_BODY) == QD_MESSAGE_DEPTH_OK && content->section_body.parsed)) |
| return &content->section_body; |
| break; |
| |
| case QD_FIELD_FOOTER: |
| if (content->section_footer.parsed || |
| (qd_message_check_depth(msg, QD_DEPTH_ALL) == QD_MESSAGE_DEPTH_OK && content->section_footer.parsed)) |
| return &content->section_footer; |
| break; |
| |
| default: |
| assert(false); // TBD: add support as needed |
| return 0; |
| } |
| |
| return 0; |
| } |
| |
| |
| qd_message_t *qd_message() |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) new_qd_message_t(); |
| if (!msg) |
| return 0; |
| |
| ZERO (msg); |
| |
| msg->content = new_qd_message_content_t(); |
| |
| if (msg->content == 0) { |
| free_qd_message_t((qd_message_t*) msg); |
| return 0; |
| } |
| |
| ZERO(msg->content); |
| msg->content->lock = sys_mutex(); |
| sys_atomic_init(&msg->content->aborted, 0); |
| sys_atomic_init(&msg->content->discard, 0); |
| sys_atomic_init(&msg->content->no_body, 0); |
| sys_atomic_init(&msg->content->oversize, 0); |
| sys_atomic_init(&msg->content->priority, QDR_DEFAULT_PRIORITY); |
| sys_atomic_init(&msg->content->priority_parsed, 0); |
| sys_atomic_init(&msg->content->receive_complete, 0); |
| sys_atomic_init(&msg->content->ref_count, 1); |
| msg->content->parse_depth = QD_DEPTH_NONE; |
| return (qd_message_t*) msg; |
| } |
| |
| |
| void qd_message_free(qd_message_t *in_msg) |
| { |
| if (!in_msg) return; |
| uint32_t rc; |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| qd_message_q2_unblocker_t q2_unblock = {0}; |
| |
| free(msg->ma_to_override); |
| |
| sys_atomic_destroy(&msg->send_complete); |
| |
| qd_message_content_t *content = msg->content; |
| |
| if (msg->is_fanout) { |
| // |
| // Adjust the content's fanout count and decrement all buffer fanout |
| // counts starting with the msg cursor. If the buffer count drops to |
| // zero we can free it. |
| // |
| LOCK(content->lock); |
| |
| // DISPATCH-2099: ensure all outstanding stream_data items associated |
| // with this message have been returned since the underlying buffers |
| // may be released |
| assert(DEQ_IS_EMPTY(msg->stream_data_list)); |
| |
| const bool was_blocked = !_Q2_holdoff_should_unblock_LH(content); |
| qd_buffer_t *buf = msg->cursor.buffer; |
| while (buf) { |
| qd_buffer_t *next_buf = DEQ_NEXT(buf); |
| if (qd_buffer_dec_fanout(buf) == 1) { |
| DEQ_REMOVE(content->buffers, buf); |
| qd_buffer_free(buf); |
| } |
| buf = next_buf; |
| } |
| --content->fanout; |
| |
| // |
| // it is possible that we've freed enough buffers to clear Q2 holdoff |
| // |
| if (content->q2_input_holdoff |
| && was_blocked |
| && _Q2_holdoff_should_unblock_LH(content)) { |
| content->q2_input_holdoff = false; |
| q2_unblock = content->q2_unblocker; |
| } |
| |
| UNLOCK(content->lock); |
| } |
| |
| // the Q2 handler must be invoked outside the lock |
| if (q2_unblock.handler) |
| q2_unblock.handler(q2_unblock.context); |
| |
| rc = sys_atomic_dec(&content->ref_count) - 1; |
| if (rc == 0) { |
| if (content->ma_field_iter_in) |
| qd_iterator_free(content->ma_field_iter_in); |
| if (content->ma_pf_ingress) |
| qd_parse_free(content->ma_pf_ingress); |
| if (content->ma_pf_to_override) |
| qd_parse_free(content->ma_pf_to_override); |
| if (content->ma_pf_trace) |
| qd_parse_free(content->ma_pf_trace); |
| |
| qd_buffer_list_free_buffers(&content->buffers); |
| |
| if (content->pending) |
| qd_buffer_free(content->pending); |
| |
| sys_mutex_free(content->lock); |
| sys_atomic_destroy(&content->aborted); |
| sys_atomic_destroy(&content->discard); |
| sys_atomic_destroy(&content->no_body); |
| sys_atomic_destroy(&content->oversize); |
| sys_atomic_destroy(&content->priority); |
| sys_atomic_destroy(&content->priority_parsed); |
| sys_atomic_destroy(&content->receive_complete); |
| sys_atomic_destroy(&content->ref_count); |
| free_qd_message_content_t(content); |
| } |
| |
| free_qd_message_t((qd_message_t*) msg); |
| } |
| |
| |
| qd_message_t *qd_message_copy(qd_message_t *in_msg) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| qd_message_content_t *content = msg->content; |
| qd_message_pvt_t *copy = (qd_message_pvt_t*) new_qd_message_t(); |
| |
| if (!copy) |
| return 0; |
| |
| ZERO(copy); |
| |
| copy->strip_annotations_in = msg->strip_annotations_in; |
| |
| copy->content = content; |
| |
| copy->sent_depth = QD_DEPTH_NONE; |
| copy->cursor.buffer = 0; |
| copy->cursor.cursor = 0; |
| sys_atomic_init(©->send_complete, 0); |
| copy->tag_sent = false; |
| copy->is_fanout = false; |
| |
| if (!content->ma_disabled) { |
| if (msg->ma_to_override) |
| copy->ma_to_override = qd_strdup(msg->ma_to_override); |
| copy->ma_filter_trace = msg->ma_filter_trace; |
| copy->ma_filter_ingress = msg->ma_filter_ingress; |
| copy->ma_reset_trace = msg->ma_reset_trace; |
| copy->ma_reset_ingress = msg->ma_reset_ingress; |
| copy->ma_phase = msg->ma_phase; |
| copy->ma_streaming = msg->ma_streaming; |
| } |
| |
| sys_atomic_inc(&content->ref_count); |
| |
| return (qd_message_t*) copy; |
| } |
| |
| const char *qd_message_parse_annotations(qd_message_t *in_msg) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| qd_message_content_t *content = msg->content; |
| |
| assert(!content->ma_disabled); // should not be called when skipping MA processing |
| if (content->ma_parsed) |
| return 0; |
| content->ma_parsed = true; |
| |
| content->ma_field_iter_in = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION); |
| if (content->ma_field_iter_in == 0) |
| return 0; |
| |
| qd_parsed_field_t *ma_pf_stream = 0; |
| qd_parsed_field_t *ma_pf_phase = 0; |
| const char *err = qd_parse_annotations(msg->strip_annotations_in, |
| content->ma_field_iter_in, |
| &content->ma_pf_ingress, |
| &ma_pf_phase, |
| &content->ma_pf_to_override, |
| &content->ma_pf_trace, |
| &ma_pf_stream, |
| &content->ma_user_annotations, |
| &content->ma_user_count); |
| if (err) |
| return(err); |
| |
| // cache incoming values into the message |
| |
| if (ma_pf_phase) { |
| msg->ma_phase = qd_parse_as_int(ma_pf_phase); |
| qd_parse_free(ma_pf_phase); |
| } |
| |
| if (ma_pf_stream) { |
| msg->ma_streaming = true; |
| qd_parse_free(ma_pf_stream); |
| } |
| |
| return 0; |
| } |
| |
| |
| void qd_message_set_to_override_annotation(qd_message_t *in_msg, const char *to_field) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| free(msg->ma_to_override); |
| msg->ma_to_override = to_field ? qd_strdup(to_field) : 0; |
| } |
| |
| |
| void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| msg->ma_phase = phase; |
| } |
| |
| int qd_message_get_phase_annotation(const qd_message_t *in_msg) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| return msg->ma_phase; |
| } |
| |
| void qd_message_set_streaming_annotation(qd_message_t *in_msg) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| msg->ma_streaming = true; |
| } |
| |
| |
| void qd_message_disable_router_annotations(qd_message_t *msg) |
| { |
| qd_message_content_t *content = ((qd_message_pvt_t *)msg)->content; |
| content->ma_disabled = true; |
| content->ma_parsed = true; |
| } |
| |
| |
| bool qd_message_is_discard(qd_message_t *msg) |
| { |
| if (!msg) |
| return false; |
| qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg; |
| return IS_ATOMIC_FLAG_SET(&pvt_msg->content->discard); |
| } |
| |
| void qd_message_set_discard(qd_message_t *msg, bool discard) |
| { |
| if (!msg) |
| return; |
| |
| qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg; |
| SET_ATOMIC_BOOL(&pvt_msg->content->discard, discard); |
| } |
| |
| |
| // update the buffer reference counts for a new outgoing message |
| // |
| void qd_message_add_fanout(qd_message_t *out_msg) |
| { |
| assert(out_msg); |
| qd_message_pvt_t *msg = (qd_message_pvt_t *)out_msg; |
| msg->is_fanout = true; |
| |
| qd_message_content_t *content = msg->content; |
| |
| LOCK(content->lock); |
| ++content->fanout; |
| |
| qd_buffer_t *buf = DEQ_HEAD(content->buffers); |
| // DISPATCH-1590: content->buffers may not be set up yet if |
| // content->pending is the first buffer and it is not yet full. |
| if (!buf) { |
| assert(content->pending && qd_buffer_size(content->pending) > 0); |
| DEQ_INSERT_TAIL(content->buffers, content->pending); |
| content->pending = 0; |
| buf = DEQ_HEAD(content->buffers); |
| } |
| // DISPATCH-1330: since we're incrementing the refcount be sure to set |
| // the cursor to the head buf in case msg is discarded before all data |
| // is sent (we'll decref any unsent buffers at that time) |
| // |
| msg->cursor.buffer = buf; |
| while (buf) { |
| qd_buffer_inc_fanout(buf); |
| buf = DEQ_NEXT(buf); |
| } |
| |
| UNLOCK(content->lock); |
| } |
| |
| |
| /** |
| * There are two sources of priority information -- |
| * message and address. Address takes precedence, falling |
| * through when no address priority has been specified. |
| * This also means that messages must always have a priority, |
| * using default value if sender leaves it unspecified. |
| */ |
| uint8_t qd_message_get_priority(qd_message_t *msg) |
| { |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| |
| if (!IS_ATOMIC_FLAG_SET(&content->priority_parsed)) |
| qd_message_parse_priority(msg); |
| |
| return sys_atomic_get(&content->priority); |
| } |
| |
| bool qd_message_receive_complete(qd_message_t *in_msg) |
| { |
| if (!in_msg) |
| return false; |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| return IS_ATOMIC_FLAG_SET(&msg->content->receive_complete); |
| } |
| |
| |
| bool qd_message_send_complete(qd_message_t *in_msg) |
| { |
| if (!in_msg) |
| return false; |
| |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| return IS_ATOMIC_FLAG_SET(&msg->send_complete); |
| } |
| |
| |
| void qd_message_set_send_complete(qd_message_t *in_msg) |
| { |
| if (!!in_msg) { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| SET_ATOMIC_FLAG(&msg->send_complete); |
| } |
| } |
| |
| |
| void qd_message_set_receive_complete(qd_message_t *in_msg) |
| { |
| if (!!in_msg) { |
| qd_message_content_t *content = MSG_CONTENT(in_msg); |
| qd_message_q2_unblocker_t q2_unblock = {0}; |
| |
| LOCK(content->lock); |
| |
| SET_ATOMIC_FLAG(&content->receive_complete); |
| if (content->q2_input_holdoff) { |
| content->q2_input_holdoff = false; |
| q2_unblock = content->q2_unblocker; |
| } |
| content->q2_unblocker.handler = 0; |
| qd_nullify_safe_ptr(&content->q2_unblocker.context); |
| |
| UNLOCK(content->lock); |
| |
| if (q2_unblock.handler) |
| q2_unblock.handler(q2_unblock.context); |
| } |
| } |
| |
| void qd_message_set_no_body(qd_message_t *in_msg) |
| { |
| if (!!in_msg) { |
| qd_message_content_t *content = MSG_CONTENT(in_msg); |
| SET_ATOMIC_FLAG(&content->no_body); |
| } |
| } |
| |
| bool qd_message_no_body(qd_message_t *in_msg) |
| { |
| if (!!in_msg) { |
| qd_message_content_t *content = MSG_CONTENT(in_msg); |
| return IS_ATOMIC_FLAG_SET(&content->no_body); |
| } |
| |
| return false; |
| } |
| |
| |
| |
| bool qd_message_tag_sent(qd_message_t *in_msg) |
| { |
| if (!in_msg) |
| return false; |
| |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| return msg->tag_sent; |
| } |
| |
| void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent) |
| { |
| if (!in_msg) |
| return; |
| |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| msg->tag_sent = tag_sent; |
| } |
| |
| |
| /** |
| * Receive and discard large messages for which there is no destination. |
| * Don't waste resources by putting the message into internal buffers. |
| * Message locking is not required since the message content buffers are untouched. |
| */ |
| qd_message_t *discard_receive(pn_delivery_t *delivery, |
| pn_link_t *link, |
| qd_message_t *msg_in) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*)msg_in; |
| while (1) { |
| #define DISCARD_BUFFER_SIZE (128 * 1024) |
| char dummy[DISCARD_BUFFER_SIZE]; |
| ssize_t rc = pn_link_recv(link, dummy, DISCARD_BUFFER_SIZE); |
| |
| if (rc == 0) { |
| // have read all available pn_link incoming bytes |
| break; |
| } else if (rc == PN_EOS || rc < 0) { |
| // End of message or error: finalize message_receive handling |
| if (pn_delivery_aborted(delivery)) { |
| SET_ATOMIC_FLAG(&msg->content->aborted); |
| } |
| pn_record_t *record = pn_delivery_attachments(delivery); |
| pn_record_set(record, PN_DELIVERY_CTX, 0); |
| if (IS_ATOMIC_FLAG_SET(&msg->content->oversize)) { |
| // Aborting the content disposes of downstream copies. |
| // This has no effect on the received message. |
| SET_ATOMIC_FLAG(&msg->content->aborted); |
| } |
| qd_message_set_receive_complete((qd_message_t*) msg); |
| break; |
| } else { |
| // rc was > 0. bytes were read and discarded. |
| } |
| } |
| |
| return msg_in; |
| } |
| |
| qd_message_t * qd_get_message_context(pn_delivery_t *delivery) |
| { |
| pn_record_t *record = pn_delivery_attachments(delivery); |
| if (record) |
| return (qd_message_t *) pn_record_get(record, PN_DELIVERY_CTX); |
| |
| return 0; |
| } |
| |
| bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg) |
| { |
| if (!msg) |
| return false; |
| |
| if (MSG_CONTENT(msg)) { |
| if (DEQ_SIZE(MSG_CONTENT(msg)->buffers) > 0) { |
| qd_buffer_t *buf = DEQ_HEAD(MSG_CONTENT(msg)->buffers); |
| if (buf && qd_buffer_size(buf) > 0) |
| return true; |
| } |
| if (MSG_CONTENT(msg)->pending && qd_buffer_size(MSG_CONTENT(msg)->pending) > 0) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| |
| qd_message_t *qd_message_receive(pn_delivery_t *delivery) |
| { |
| pn_link_t *link = pn_delivery_link(delivery); |
| qd_link_t *qdl = (qd_link_t *)pn_link_get_context(link); |
| ssize_t rc; |
| |
| pn_record_t *record = pn_delivery_attachments(delivery); |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX); |
| |
| // |
| // If there is no message associated with the delivery then this is the |
| // first time we've received anything on this delivery. |
| // Allocate a message descriptor and link it and the delivery together. |
| // |
| if (!msg) { |
| msg = (qd_message_pvt_t*) qd_message(); |
| qd_connection_t *qdc = qd_link_connection(qdl); |
| qd_alloc_safe_ptr_t sp = QD_SAFE_PTR_INIT(qdl); |
| qd_message_set_q2_unblocked_handler((qd_message_t*) msg, qd_link_q2_restart_receive, sp); |
| msg->strip_annotations_in = qd_connection_strip_annotations_in(qdc); |
| pn_record_def(record, PN_DELIVERY_CTX, PN_VOID); |
| pn_record_set(record, PN_DELIVERY_CTX, (void*) msg); |
| msg->content->max_message_size = qd_connection_max_message_size(qdc); |
| qd_link_set_incoming_msg(qdl, (qd_message_t*) msg); |
| } |
| |
| // |
| // The discard flag indicates we should keep reading the input stream |
| // but not process the message for delivery. |
| // Oversize messages are also discarded. |
| // |
| if (IS_ATOMIC_FLAG_SET(&msg->content->discard)) { |
| return discard_receive(delivery, link, (qd_message_t *)msg); |
| } |
| |
| // if q2 holdoff has been disabled (disable_q2_holdoff=true), we keep receiving. |
| // if q2 holdoff has been enabled (disable_q2_holdoff=false), if input is in holdoff then just exit. |
| // When enough buffers |
| // have been processed and freed by outbound processing then |
| // message holdoff is cleared and receiving may continue. |
| // |
| LOCK(msg->content->lock); |
| if (!qd_link_is_q2_limit_unbounded(qdl) && !msg->content->disable_q2_holdoff) { |
| if (msg->content->q2_input_holdoff) { |
| UNLOCK(msg->content->lock); |
| return (qd_message_t*)msg; |
| } |
| } |
| UNLOCK(msg->content->lock); |
| |
| // Loop until msg is complete, error seen, or incoming bytes are consumed |
| qd_message_content_t *content = msg->content; |
| bool recv_error = false; |
| while (1) { |
| // |
| // handle EOS and clean up after pn receive errors |
| // |
| bool at_eos = (pn_delivery_partial(delivery) == false) && |
| (pn_delivery_aborted(delivery) == false) && |
| (pn_delivery_pending(delivery) == 0); |
| |
| if (at_eos || recv_error) { |
| // Message is complete |
| qd_buffer_t * pending_free = 0; // free empty pending buffer outside of lock |
| LOCK(content->lock); |
| { |
| // Append last buffer if any with data |
| if (content->pending) { |
| if (qd_buffer_size(content->pending) > 0) { |
| // pending buffer has bytes that are part of message |
| qd_buffer_set_fanout(content->pending, content->fanout); |
| DEQ_INSERT_TAIL(content->buffers, |
| content->pending); |
| } else { |
| // pending buffer is empty |
| pending_free = content->pending; |
| } |
| content->pending = 0; |
| } else { |
| // pending buffer is absent |
| } |
| |
| content->receive_complete = true; |
| content->q2_unblocker.handler = 0; |
| qd_nullify_safe_ptr(&content->q2_unblocker.context); |
| if (pn_delivery_aborted(delivery)) { |
| SET_ATOMIC_FLAG(&msg->content->aborted); |
| } |
| // unlink message and delivery |
| pn_record_set(record, PN_DELIVERY_CTX, 0); |
| } |
| UNLOCK(content->lock); |
| if (!!pending_free) { |
| qd_buffer_free(pending_free); |
| } |
| break; |
| } |
| |
| // |
| // Handle a missing or full pending buffer |
| // |
| if (!content->pending) { |
| // Pending buffer is absent: get a new one |
| content->pending = qd_buffer(); |
| } else { |
| // Pending buffer exists |
| if (qd_buffer_capacity(content->pending) == 0) { |
| // Pending buffer is full |
| LOCK(content->lock); |
| qd_buffer_set_fanout(content->pending, content->fanout); |
| DEQ_INSERT_TAIL(content->buffers, content->pending); |
| content->pending = 0; |
| if (_Q2_holdoff_should_block_LH(content)) { |
| if (!qd_link_is_q2_limit_unbounded(qdl)) { |
| content->q2_input_holdoff = true; |
| UNLOCK(content->lock); |
| break; |
| } |
| } |
| UNLOCK(content->lock); |
| content->pending = qd_buffer(); |
| } else { |
| // Pending buffer still has capacity |
| } |
| } |
| |
| // |
| // Try to fill the remaining space in the pending buffer. |
| // |
| rc = pn_link_recv(link, |
| (char*) qd_buffer_cursor(content->pending), |
| qd_buffer_capacity(content->pending)); |
| |
| if (rc < 0) { |
| // error or eos seen. next pass breaks out of loop |
| recv_error = true; |
| } else if (rc > 0) { |
| // |
| // We have received a positive number of bytes for the message. |
| // Advance the cursor in the buffer. |
| // |
| qd_buffer_insert(content->pending, rc); |
| |
| // Handle maxMessageSize violations |
| if (content->max_message_size) { |
| content->bytes_received += rc; |
| if (content->bytes_received > content->max_message_size) |
| { |
| qd_connection_t *conn = qd_link_connection(qdl); |
| qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer maxMessageSize exceeded"); |
| qd_policy_count_max_size_event(link, conn); |
| SET_ATOMIC_FLAG(&content->discard); |
| SET_ATOMIC_FLAG(&content->oversize); |
| return discard_receive(delivery, link, (qd_message_t*)msg); |
| } |
| } |
| } else { |
| // |
| // We received zero bytes, and no PN_EOS. This means that we've received |
| // all of the data available up to this point, but it does not constitute |
| // the entire message. We'll be back later to finish it up. |
| // Return the message so that the caller can start sending out whatever we have received so far |
| // |
| // push what we do have for testing/processing |
| if (qd_buffer_size(content->pending) > 0) { |
| LOCK(content->lock); |
| qd_buffer_set_fanout(content->pending, content->fanout); |
| DEQ_INSERT_TAIL(content->buffers, content->pending); |
| content->pending = 0; |
| UNLOCK(content->lock); |
| content->pending = qd_buffer(); |
| } |
| break; |
| } |
| } |
| |
| return (qd_message_t*) msg; |
| } |
| |
| |
| static void send_handler(void *context, const unsigned char *start, int length) |
| { |
| pn_link_t *pnl = (pn_link_t*) context; |
| pn_link_send(pnl, (const char*) start, length); |
| } |
| |
| |
| // Restore MA to the original user-supplied MA values. This merely sets up the |
| // annotations section and map header to hold only the user supplied |
| // annotations. |
| // |
| // @return the length of the ma_header field in octets |
| // |
| static int restore_user_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header) |
| { |
| qd_message_content_t *content = msg->content; |
| if (content->ma_user_count) { |
| |
| // setup the MA descriptor: |
| ma_header[0] = 0; |
| ma_header[1] = QD_AMQP_SMALLULONG; |
| ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS; |
| |
| // setup the MA MAP header. The type of header (MAP32/8) depends on |
| // the size of the map contents. |
| const int map_hdr_len = qd_compose_map_header(&ma_header[3], |
| content->ma_user_annotations.remaining, |
| content->ma_user_count); |
| return map_hdr_len + 3; |
| } |
| return 0; |
| } |
| |
| |
| // Generate the MA section header and the router annotations. Any user |
| // annotations will be sent after ma_header and before ma_trailer. |
| // |
| // @return the length of the ma_header field in octets |
| // |
| static int compose_router_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header, |
| qd_buffer_list_t *ma_trailer) |
| { |
| qd_message_content_t *content = msg->content; |
| |
| // account for any user annotations to be sent before the router annotations |
| // |
| uint32_t mcount = content->ma_user_count; |
| uint32_t msize = content->ma_user_annotations.remaining; |
| |
| if (msg->ma_phase) { |
| assert(msg->ma_phase < 128); // smallint |
| mcount += 2; |
| |
| // key: |
| msize += QD_MA_PHASE_ENCODED_LEN; |
| qd_buffer_list_append(ma_trailer, QD_MA_PHASE_ENCODED, QD_MA_PHASE_ENCODED_LEN); |
| |
| // value: |
| msize += 2; // tag + 1 byte value |
| uint8_t ma_phase[2]; |
| ma_phase[0] = QD_AMQP_SMALLINT; |
| ma_phase[1] = msg->ma_phase; |
| qd_buffer_list_append(ma_trailer, ma_phase, 2); |
| } |
| |
| if (msg->ma_streaming) { |
| mcount += 2; |
| |
| // key: |
| msize += QD_MA_STREAM_ENCODED_LEN; |
| qd_buffer_list_append(ma_trailer, QD_MA_STREAM_ENCODED, QD_MA_STREAM_ENCODED_LEN); |
| |
| // value: historically sent as int value 1: |
| msize += 2; |
| const uint8_t streaming[2] = {QD_AMQP_SMALLINT, 1}; |
| qd_buffer_list_append(ma_trailer, streaming, 2); |
| } |
| |
| if (msg->ma_to_override || content->ma_pf_to_override) { |
| mcount += 2; |
| |
| // key: |
| msize += QD_MA_TO_ENCODED_LEN; |
| qd_buffer_list_append(ma_trailer, QD_MA_TO_ENCODED, QD_MA_TO_ENCODED_LEN); |
| |
| // value: message specific value takes precedence over value in |
| // original received message to allow overriding the to-override |
| uint8_t hdr[5]; // max length of encoded str8/32 header |
| if (msg->ma_to_override) { |
| const size_t str_len = strlen(msg->ma_to_override); |
| const int hdr_len = qd_compose_str_header(hdr, str_len); |
| |
| msize += hdr_len; |
| qd_buffer_list_append(ma_trailer, hdr, hdr_len); |
| |
| msize += str_len; |
| qd_buffer_list_append(ma_trailer, (uint8_t*) msg->ma_to_override, str_len); |
| |
| } else { |
| qd_buffer_field_t to = qd_parse_value(content->ma_pf_to_override); |
| const int hdr_len = qd_compose_str_header(hdr, to.remaining); |
| |
| msize += hdr_len; |
| qd_buffer_list_append(ma_trailer, hdr, hdr_len); |
| |
| msize += to.remaining; |
| qd_buffer_list_append_field(ma_trailer, &to); |
| } |
| } |
| |
| if (!msg->ma_filter_ingress) { |
| mcount += 2; |
| |
| // key |
| msize += QD_MA_INGRESS_ENCODED_LEN; |
| qd_buffer_list_append(ma_trailer, QD_MA_INGRESS_ENCODED, QD_MA_INGRESS_ENCODED_LEN); |
| |
| // value: use original value if present, else the local node is the |
| // ingress |
| if (content->ma_pf_ingress && !msg->ma_reset_ingress) { |
| uint8_t hdr[5]; // max size str8/32 header |
| qd_buffer_field_t ingress = qd_parse_value(content->ma_pf_ingress); |
| const int hdr_len = qd_compose_str_header(hdr, ingress.remaining); |
| |
| msize += hdr_len; |
| qd_buffer_list_append(ma_trailer, hdr, hdr_len); |
| |
| msize += ingress.remaining; |
| qd_buffer_list_append_field(ma_trailer, &ingress); |
| |
| } else { |
| size_t node_id_len; |
| const uint8_t *node_id = qd_router_id_encoded(&node_id_len); |
| msize += node_id_len; |
| qd_buffer_list_append(ma_trailer, node_id, node_id_len); |
| } |
| } |
| |
| if (!msg->ma_filter_trace) { |
| mcount += 2; |
| size_t node_id_len; |
| const uint8_t *node_id = qd_router_id_encoded(&node_id_len); |
| uint32_t trace_count = 1; // local node |
| uint32_t trace_len = node_id_len; |
| const bool use_incoming = content->ma_pf_trace && !msg->ma_reset_trace; |
| |
| // key |
| msize += QD_MA_TRACE_ENCODED_LEN; |
| qd_buffer_list_append(ma_trailer, QD_MA_TRACE_ENCODED, QD_MA_TRACE_ENCODED_LEN); |
| |
| // value: first compute trace list size and count since the list header |
| // must be written first |
| qd_buffer_field_t in_trace; |
| if (use_incoming) { |
| in_trace = qd_parse_value(content->ma_pf_trace); |
| trace_len += in_trace.remaining; |
| trace_count += qd_parse_sub_count(content->ma_pf_trace); |
| } |
| |
| uint8_t list_hdr[9]; // max len encoded list header |
| const int hdr_len = qd_compose_list_header(list_hdr, trace_len, trace_count); |
| |
| msize += hdr_len; |
| qd_buffer_list_append(ma_trailer, list_hdr, hdr_len); |
| |
| if (use_incoming) { |
| msize += in_trace.remaining; |
| qd_buffer_list_append_field(ma_trailer, &in_trace); |
| } |
| |
| msize += node_id_len; |
| qd_buffer_list_append(ma_trailer, node_id, node_id_len); |
| } |
| |
| if (msize) { |
| // setup the MA section descriptor: |
| ma_header[0] = 0; |
| ma_header[1] = QD_AMQP_SMALLULONG; |
| ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS; |
| |
| // setup the MA MAP header |
| const int hdr_size = qd_compose_map_header(&ma_header[3], msize, mcount); |
| return hdr_size + 3; |
| } |
| |
| return 0; |
| } |
| |
| |
| void qd_message_send(qd_message_t *in_msg, |
| qd_link_t *link, |
| bool strip_annotations, |
| bool *q3_stalled) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| qd_message_content_t *content = msg->content; |
| pn_link_t *pnl = qd_link_pn(link); |
| |
| *q3_stalled = false; |
| |
| if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { |
| |
| if (IS_ATOMIC_FLAG_SET(&content->aborted)) { |
| // Message is aborted before any part of it has been sent. |
| // Declare the message to be sent, |
| SET_ATOMIC_FLAG(&msg->send_complete); |
| // If the outgoing delivery is not already aborted then abort it. |
| if (!pn_delivery_aborted(pn_link_current(pnl))) { |
| pn_delivery_abort(pn_link_current(pnl)); |
| } |
| return; |
| } |
| |
| msg->cursor.buffer = DEQ_HEAD(content->buffers); |
| msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer); |
| |
| // Since link-routed messages do not set router annotations they will |
| // skip the following (content->ma_disabled will be true) and unconditionally |
| // start sending from the first octet of the content. |
| |
| if (!content->ma_disabled) { |
| // |
| // Send header if present |
| // |
| const int header_consume = content->section_message_header.length + content->section_message_header.hdr_length; |
| if (header_consume > 0) { |
| assert(msg->cursor.cursor == content->section_message_header.offset + qd_buffer_base(msg->cursor.buffer)); |
| advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, header_consume, send_handler, (void*) pnl); |
| } |
| |
| // |
| // Send delivery annotation if present |
| // |
| const int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length; |
| if (da_consume > 0) { |
| assert(msg->cursor.cursor == content->section_delivery_annotation.offset + qd_buffer_base(msg->cursor.buffer)); |
| advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, da_consume, send_handler, (void*) pnl); |
| } |
| |
| // |
| // Send the message annotations section |
| // |
| |
| uint8_t ma_header[12]; // max length for MA section and map header |
| int ma_header_len; // size of ma_header content |
| qd_buffer_list_t ma_trailer = DEQ_EMPTY; |
| |
| if (strip_annotations) { |
| // send the original user message annotations only (if present) |
| ma_header_len = restore_user_message_annotations(msg, ma_header); |
| } else { |
| ma_header_len = compose_router_message_annotations(msg, ma_header, &ma_trailer); |
| } |
| |
| if (ma_header_len) { |
| // |
| // send annotation section and map header |
| // |
| pn_link_send(pnl, (char*) ma_header, ma_header_len); |
| |
| // |
| // Now send any annotation set by the original endpoint |
| // |
| if (content->ma_user_annotations.remaining) { |
| qd_buffer_t *buf2 = content->ma_user_annotations.buffer; |
| const uint8_t *cursor2 = content->ma_user_annotations.cursor; |
| advance_guarded(&cursor2, &buf2, |
| content->ma_user_annotations.remaining, |
| send_handler, (void*) pnl); |
| } |
| |
| // |
| // Next send router annotations |
| // |
| qd_buffer_t *ta_buf = DEQ_HEAD(ma_trailer); |
| while (ta_buf) { |
| char *to_send = (char*) qd_buffer_base(ta_buf); |
| pn_link_send(pnl, to_send, qd_buffer_size(ta_buf)); |
| ta_buf = DEQ_NEXT(ta_buf); |
| } |
| qd_buffer_list_free_buffers(&ma_trailer); |
| } |
| |
| // |
| // Skip over replaced message annotations |
| // |
| const int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length; |
| if (ma_consume > 0) { |
| assert(msg->cursor.cursor == content->section_message_annotation.offset + qd_buffer_base(msg->cursor.buffer)); |
| advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, ma_consume, 0, 0); |
| } |
| } |
| |
| msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS; |
| } |
| |
| qd_buffer_t *buf = msg->cursor.buffer; |
| |
| qd_message_q2_unblocker_t q2_unblock = {0}; |
| pn_session_t *pns = pn_link_session(pnl); |
| const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; |
| |
| while (!IS_ATOMIC_FLAG_SET(&content->aborted) |
| && buf |
| && pn_session_outgoing_bytes(pns) < q3_upper) { |
| |
| // This will send the remaining data in the buffer if any. There may be |
| // zero bytes left to send if we stopped here last time and there was |
| // no next buf |
| // |
| size_t buf_size = qd_buffer_size(buf); |
| int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf)); |
| ssize_t bytes_sent = 0; |
| if (num_bytes_to_send > 0) { |
| bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send); |
| } |
| |
| LOCK(content->lock); |
| |
| if (bytes_sent < 0) { |
| // |
| // send error - likely the link has failed and we will eventually |
| // get a link detach event for this link |
| // |
| SET_ATOMIC_FLAG(&content->aborted); |
| SET_ATOMIC_FLAG(&msg->send_complete); |
| if (!pn_delivery_aborted(pn_link_current(pnl))) { |
| pn_delivery_abort(pn_link_current(pnl)); |
| } |
| |
| qd_log(qd_message_log_source(), |
| QD_LOG_WARNING, |
| "Sending data on link %s has failed (code=%zi)", |
| pn_link_name(pnl), bytes_sent); |
| |
| } else { |
| |
| msg->cursor.cursor += bytes_sent; |
| |
| if (bytes_sent == num_bytes_to_send) { |
| // |
| // sent the whole buffer. |
| // Can we move to the next buffer? Only if there is a next buffer |
| // or we are at the end and done sending this message |
| // |
| qd_buffer_t *next_buf = DEQ_NEXT(buf); |
| bool complete = qd_message_receive_complete(in_msg); |
| |
| if (next_buf || complete) { |
| // |
| // this buffer may be freed if there are no more references to it |
| // |
| uint32_t ref_count = (msg->is_fanout) ? qd_buffer_dec_fanout(buf) : 1; |
| if (ref_count == 1) { |
| |
| DEQ_REMOVE(content->buffers, buf); |
| qd_buffer_free(buf); |
| ++content->buffers_freed; |
| |
| // by freeing a buffer there now may be room to restart a |
| // stalled message receiver |
| if (content->q2_input_holdoff) { |
| if (_Q2_holdoff_should_unblock_LH(content)) { |
| // wake up receive side |
| // Note: clearing holdoff here is easy compared to |
| // clearing it in the deferred callback. Tracing |
| // shows that rx_handler may run and subsequently |
| // set input holdoff before the deferred handler |
| // runs. |
| content->q2_input_holdoff = false; |
| q2_unblock = content->q2_unblocker; |
| } |
| } |
| } // end free buffer |
| |
| msg->cursor.buffer = next_buf; |
| msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) : 0; |
| |
| SET_ATOMIC_BOOL(&msg->send_complete, (complete && !next_buf)); |
| } |
| |
| buf = next_buf; |
| |
| } else if (num_bytes_to_send && bytes_sent == 0) { |
| // |
| // the proton link cannot take anymore data, |
| // retry later... |
| // |
| buf = 0; |
| qd_log(qd_message_log_source(), QD_LOG_DEBUG, |
| "Link %s output limit reached", pn_link_name(pnl)); |
| } |
| } |
| |
| UNLOCK(content->lock); |
| } |
| |
| // the Q2 handler must be invoked outside the lock |
| if (q2_unblock.handler) |
| q2_unblock.handler(q2_unblock.context); |
| |
| if (IS_ATOMIC_FLAG_SET(&content->aborted)) { |
| if (pn_link_current(pnl)) { |
| SET_ATOMIC_FLAG(&msg->send_complete); |
| if (!pn_delivery_aborted(pn_link_current(pnl))) { |
| pn_delivery_abort(pn_link_current(pnl)); |
| } |
| } |
| } |
| |
| *q3_stalled = (pn_session_outgoing_bytes(pns) >= q3_upper); |
| } |
| |
| |
| static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *content, |
| qd_message_depth_t depth, |
| const unsigned char *long_pattern, |
| const unsigned char *short_pattern, |
| const unsigned char *expected_tags, |
| qd_field_location_t *location, |
| bool optional, |
| bool protect_buffer) |
| { |
| #define LONG 10 |
| #define SHORT 3 |
| if (depth <= content->parse_depth) |
| return QD_MESSAGE_DEPTH_OK; |
| |
| qd_section_status_t rc; |
| rc = message_section_check_LH(content, &content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false, protect_buffer); |
| if (rc == QD_SECTION_NO_MATCH) // try the alternative |
| rc = message_section_check_LH(content, &content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location, false, protect_buffer); |
| |
| if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) { |
| content->parse_depth = depth; |
| return QD_MESSAGE_DEPTH_OK; |
| } |
| |
| if (rc == QD_SECTION_NEED_MORE) { |
| if (!IS_ATOMIC_FLAG_SET(&content->receive_complete)) |
| return QD_MESSAGE_DEPTH_INCOMPLETE; |
| |
| // no more data is going to come. OK if at the end and optional: |
| if (!can_advance(&content->parse_cursor, &content->parse_buffer) && optional) |
| return QD_MESSAGE_DEPTH_OK; |
| |
| // otherwise we've got an invalid (truncated) header |
| } |
| |
| // if QD_SECTION_NO_MATCH && !optional => INVALID; |
| // QD_SECTION_INVALID => INVALID; |
| |
| return QD_MESSAGE_DEPTH_INVALID; |
| } |
| |
| |
| static qd_message_depth_status_t qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth) |
| { |
| qd_error_clear(); |
| |
| if (depth <= content->parse_depth || depth == QD_DEPTH_NONE) |
| return QD_MESSAGE_DEPTH_OK; // We've already parsed at least this deep |
| |
| // Is there any data to check? This will also check for null messages, which |
| // are not valid: |
| // |
| qd_buffer_t *buffer = DEQ_HEAD(content->buffers); |
| if (!buffer || qd_buffer_size(buffer) == 0) { |
| return IS_ATOMIC_FLAG_SET(&content->receive_complete) ? QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE; |
| } |
| |
| if (content->buffers_freed) { |
| // this is likely a bug: the caller is attempting to access a |
| // section after the start of the message has already been sent and |
| // released, rendering the parse_buffer/cursor position invalid. |
| return QD_MESSAGE_DEPTH_INVALID; |
| } |
| |
| if (content->parse_buffer == 0) { |
| content->parse_buffer = buffer; |
| content->parse_cursor = qd_buffer_base(content->parse_buffer); |
| } |
| |
| qd_message_depth_status_t rc = QD_MESSAGE_DEPTH_OK; |
| int last_section = QD_DEPTH_NONE; |
| |
| switch (content->parse_depth + 1) { // start checking at the next unparsed section |
| case QD_DEPTH_HEADER: |
| // |
| // MESSAGE HEADER (optional) |
| // |
| last_section = QD_DEPTH_HEADER; |
| rc = message_check_depth_LH(content, QD_DEPTH_HEADER, |
| MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, |
| &content->section_message_header, true, true); |
| if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_HEADER) |
| break; |
| |
| // fallthrough |
| |
| case QD_DEPTH_DELIVERY_ANNOTATIONS: |
| // |
| // DELIVERY ANNOTATIONS (optional) |
| // |
| last_section = QD_DEPTH_DELIVERY_ANNOTATIONS; |
| rc = message_check_depth_LH(content, QD_DEPTH_DELIVERY_ANNOTATIONS, |
| DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, |
| &content->section_delivery_annotation, true, true); |
| if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_DELIVERY_ANNOTATIONS) |
| break; |
| |
| // fallthrough |
| |
| case QD_DEPTH_MESSAGE_ANNOTATIONS: |
| // |
| // MESSAGE ANNOTATION (optional) |
| // |
| last_section = QD_DEPTH_MESSAGE_ANNOTATIONS; |
| rc = message_check_depth_LH(content, QD_DEPTH_MESSAGE_ANNOTATIONS, |
| MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, |
| &content->section_message_annotation, true, true); |
| if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_MESSAGE_ANNOTATIONS) |
| break; |
| |
| // fallthough |
| |
| case QD_DEPTH_PROPERTIES: |
| // |
| // PROPERTIES (optional) |
| // |
| last_section = QD_DEPTH_PROPERTIES; |
| rc = message_check_depth_LH(content, QD_DEPTH_PROPERTIES, |
| PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, |
| &content->section_message_properties, true, true); |
| if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_PROPERTIES) |
| break; |
| |
| // fallthrough |
| |
| case QD_DEPTH_APPLICATION_PROPERTIES: |
| // |
| // APPLICATION PROPERTIES (optional) |
| // |
| last_section = QD_DEPTH_APPLICATION_PROPERTIES; |
| rc = message_check_depth_LH(content, QD_DEPTH_APPLICATION_PROPERTIES, |
| APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, |
| &content->section_application_properties, true, true); |
| if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_APPLICATION_PROPERTIES) |
| break; |
| |
| // fallthrough |
| |
| case QD_DEPTH_BODY: |
| |
| // |
| // BODY (not optional, but proton allows it - see PROTON-2085) |
| // |
| // AMQP 1.0 defines 3 valid Body types: Binary, Sequence (list), or Value (any type) |
| // Since the body is mandatory, we need to match one of these. Setting |
| // the optional flag to false will force us to check each one until a match is found. |
| // |
| last_section = QD_DEPTH_BODY; |
| rc = message_check_depth_LH(content, QD_DEPTH_BODY, |
| BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, |
| &content->section_body, false, false); |
| if (rc == QD_MESSAGE_DEPTH_INVALID) { // may be a different body type, need to check: |
| rc = message_check_depth_LH(content, QD_DEPTH_BODY, |
| BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, |
| &content->section_body, false, false); |
| if (rc == QD_MESSAGE_DEPTH_INVALID) { |
| rc = message_check_depth_LH(content, QD_DEPTH_BODY, |
| BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, |
| &content->section_body, true, false); // PROTON-2085 |
| } |
| } |
| |
| if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_BODY) |
| break; |
| |
| // fallthrough |
| |
| case QD_DEPTH_ALL: |
| // |
| // FOOTER (optional) |
| // |
| last_section = QD_DEPTH_ALL; |
| rc = message_check_depth_LH(content, QD_DEPTH_ALL, |
| FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, |
| &content->section_footer, true, false); |
| break; |
| |
| default: |
| assert(false); // should not happen! |
| qd_error(QD_ERROR_MESSAGE, "BUG! Invalid message depth specified: %d", |
| content->parse_depth + 1); |
| return QD_MESSAGE_DEPTH_INVALID; |
| } |
| |
| if (rc == QD_MESSAGE_DEPTH_INVALID) |
| qd_error(QD_ERROR_MESSAGE, "Invalid message: %s section invalid", |
| section_names[last_section]); |
| |
| return rc; |
| } |
| |
| |
| qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_message_depth_t depth) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| qd_message_content_t *content = msg->content; |
| qd_message_depth_status_t result; |
| |
| LOCK(content->lock); |
| result = qd_message_check_LH(content, depth); |
| UNLOCK(content->lock); |
| return result; |
| } |
| |
| |
| qd_iterator_t *qd_message_field_iterator_typed(qd_message_t *msg, qd_message_field_t field) |
| { |
| qd_field_location_t *loc = qd_message_field_location(msg, field); |
| |
| if (!loc) |
| return 0; |
| |
| if (loc->tag == QD_AMQP_NULL) |
| return 0; |
| |
| return qd_iterator_buffer(loc->buffer, loc->offset, loc->length + loc->hdr_length, ITER_VIEW_ALL); |
| } |
| |
| |
| qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t field) |
| { |
| qd_field_location_t *loc = qd_message_field_location(msg, field); |
| |
| if (!loc) |
| return 0; |
| |
| if (loc->tag == QD_AMQP_NULL) |
| return 0; |
| |
| qd_buffer_t *buffer = loc->buffer; |
| unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset; |
| if (!advance(&cursor, &buffer, loc->hdr_length)) |
| return 0; |
| |
| return qd_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length, ITER_VIEW_ALL); |
| } |
| |
| |
| ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field) |
| { |
| qd_field_location_t *loc = qd_message_field_location(msg, field); |
| if (!loc) |
| return -1; |
| |
| return loc->length; |
| } |
| |
| |
| ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char *buffer, size_t *hdr_length) |
| { |
| qd_field_location_t *loc = qd_message_field_location(msg, field); |
| if (!loc) |
| return -1; |
| |
| qd_buffer_t *buf = loc->buffer; |
| size_t bufsize = qd_buffer_size(buf) - loc->offset; |
| void *base = qd_buffer_base(buf) + loc->offset; |
| size_t remaining = loc->length + loc->hdr_length; |
| *hdr_length = loc->hdr_length; |
| |
| while (remaining > 0) { |
| if (bufsize > remaining) |
| bufsize = remaining; |
| memcpy(buffer, base, bufsize); |
| buffer += bufsize; |
| remaining -= bufsize; |
| if (remaining > 0) { |
| buf = buf->next; |
| base = qd_buffer_base(buf); |
| bufsize = qd_buffer_size(buf); |
| } |
| } |
| |
| return loc->length + loc->hdr_length; |
| } |
| |
| |
| // deprecated - use qd_message_compose() for creating locally generated messages |
| void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, bool receive_complete) |
| { |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| |
| LOCK(content->lock); |
| |
| SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); |
| qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); |
| qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); |
| |
| content->buffers = *field1_buffers; |
| DEQ_INIT(*field1_buffers); |
| DEQ_APPEND(content->buffers, (*field2_buffers)); |
| |
| // initialize the Q2 flag: |
| if (_Q2_holdoff_should_block_LH(content)) |
| content->q2_input_holdoff = true; |
| |
| UNLOCK(content->lock); |
| |
| // set up the locations of the message headers sent prior to the message |
| // annotations section. This is used when composing outgoing router |
| // annotations: |
| qd_message_parse_annotations(msg); |
| } |
| |
| |
| qd_message_t *qd_message_compose(qd_composed_field_t *f1, |
| qd_composed_field_t *f2, |
| qd_composed_field_t *f3, |
| bool receive_complete) |
| { |
| qd_message_t *msg = qd_message(); |
| if (!msg) |
| return 0; |
| |
| qd_composed_field_t *fields[4] = {f1, f2, f3, 0}; |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); |
| |
| for (int idx = 0; fields[idx] != 0; ++idx) { |
| qd_buffer_list_t *bufs = qd_compose_buffers(fields[idx]); |
| DEQ_APPEND(content->buffers, (*bufs)); |
| qd_compose_free(fields[idx]); |
| } |
| |
| // initialize the Q2 flag: |
| if (_Q2_holdoff_should_block_LH(content)) |
| content->q2_input_holdoff = true; |
| |
| // set up the locations of the message headers sent prior to the message |
| // annotations section. This is used when composing outgoing router |
| // annotations: |
| qd_message_parse_annotations(msg); |
| |
| return msg; |
| } |
| |
| |
| int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked) |
| { |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| int count; |
| qd_buffer_list_t *buffers = qd_compose_buffers(field); |
| qd_buffer_t *buf = DEQ_HEAD(*buffers); |
| |
| if (q2_blocked) |
| *q2_blocked = false; |
| |
| LOCK(content->lock); |
| while (buf) { |
| qd_buffer_set_fanout(buf, content->fanout); |
| buf = DEQ_NEXT(buf); |
| } |
| |
| DEQ_APPEND(content->buffers, (*buffers)); |
| count = DEQ_SIZE(content->buffers); |
| |
| // buffers added - must check for Q2: |
| if (_Q2_holdoff_should_block_LH(content)) { |
| content->q2_input_holdoff = true; |
| if (q2_blocked) |
| *q2_blocked = true; |
| } |
| |
| UNLOCK(content->lock); |
| return count; |
| } |
| |
| |
| /** |
| * find_last_buffer_LH |
| * |
| * Given a field location, find the following: |
| * |
| * - *cursor - The pointer to the octet _past_ the last octet in the field. If this is the last octet in |
| * the buffer, the cursor must point one octet past the buffer. |
| * - *buffer - The last buffer that contains content for this field. |
| * |
| * Important: If the last octet of the field is the last octet of a buffer and there are more buffers in the |
| * buffer list, *buffer _must_ refer to the buffer that contains the last octet of the field and *cursor must |
| * point at the octet following that octet, even if it points past the end of the buffer. |
| */ |
| static void find_last_buffer_LH(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer) |
| { |
| qd_buffer_t *buf = location->buffer; |
| size_t remaining = location->hdr_length + location->length; |
| |
| while (!!buf && remaining > 0) { |
| size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0); |
| if (remaining <= this_buf_size) { |
| *buffer = buf; |
| *cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining; |
| return; |
| } |
| remaining -= this_buf_size; |
| buf = DEQ_NEXT(buf); |
| } |
| |
| assert(false); // The field should already have been validated as complete. |
| } |
| |
| |
| void trim_stream_data_headers_LH(qd_message_stream_data_t *stream_data, bool remove_vbin_header) |
| { |
| const qd_field_location_t *location = &stream_data->section; |
| qd_buffer_t *buffer = location->buffer; |
| unsigned char *cursor = qd_buffer_base(buffer) + location->offset; |
| |
| bool good = advance(&cursor, &buffer, location->hdr_length); |
| assert(good); |
| if (good) { |
| size_t vbin_hdr_len = 0; |
| unsigned char tag = 0; |
| |
| if (remove_vbin_header) { |
| vbin_hdr_len = 1; |
| // coverity[check_return] |
| next_octet(&cursor, &buffer, &tag); |
| if (tag == QD_AMQP_VBIN8) { |
| advance(&cursor, &buffer, 1); |
| vbin_hdr_len += 1; |
| } else if (tag == QD_AMQP_VBIN32) { |
| advance(&cursor, &buffer, 4); |
| vbin_hdr_len += 4; |
| } |
| } |
| |
| // coverity[check_return] |
| can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary |
| |
| stream_data->payload.buffer = buffer; |
| stream_data->payload.offset = cursor - qd_buffer_base(buffer); |
| stream_data->payload.length = location->length - vbin_hdr_len; |
| stream_data->payload.hdr_length = 0; |
| stream_data->payload.parsed = true; |
| stream_data->payload.tag = tag; |
| } |
| } |
| |
| |
| /** |
| * qd_message_stream_data_iterator |
| * |
| * Given a stream_data object, return an iterator that refers to the content of that body data. This iterator |
| * shall not refer to the 3-byte performative header or the header for the vbin{8,32} field. |
| * |
| * The iterator must be freed eventually by the caller. |
| */ |
| qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data) |
| { |
| const qd_field_location_t *location = &stream_data->payload; |
| |
| return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL); |
| } |
| |
| /** |
| * qd_message_stream_data_payload_length |
| * |
| * Given a stream_data object, return the length of the payload. |
| */ |
| size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data) |
| { |
| return stream_data->payload.length; |
| } |
| |
| |
| /** |
| * qd_message_stream_data_buffer_count |
| * |
| * Return the number of buffers contained in payload portion of the stream_data object. |
| */ |
| int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data) |
| { |
| if (stream_data->payload.length == 0) |
| return 0; |
| |
| int count = 1; |
| qd_buffer_t *buffer = stream_data->payload.buffer; |
| while (!!buffer && buffer != stream_data->last_buffer) { |
| buffer = DEQ_NEXT(buffer); |
| count++; |
| } |
| |
| return count; |
| } |
| |
| |
| /** |
| * qd_message_stream_data_buffers |
| * |
| * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the stream_data |
| * object. Don't fill more than count raw_buffers with data. Start at offset from the zero-th buffer in the |
| * stream_data. |
| */ |
| int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count) |
| { |
| qd_buffer_t *buffer = stream_data->payload.buffer; |
| size_t data_offset = stream_data->payload.offset; |
| size_t payload_len = stream_data->payload.length; |
| |
| qd_message_pvt_t *owning_message = stream_data->owning_message; |
| |
| |
| LOCK(owning_message->content->lock); |
| // |
| // Skip the buffer offset |
| // |
| if (offset > 0) { |
| assert(offset < qd_message_stream_data_buffer_count(stream_data)); |
| while (offset > 0 && payload_len > 0) { |
| payload_len -= qd_buffer_size(buffer) - data_offset; |
| offset--; |
| data_offset = 0; |
| buffer = DEQ_NEXT(buffer); |
| } |
| } |
| |
| // |
| // Fill the buffer array |
| // |
| int idx = 0; |
| while (idx < count && payload_len > 0) { |
| size_t buf_size = MIN(payload_len, qd_buffer_size(buffer) - data_offset); |
| buffers[idx].context = 0; // reserved for use by caller - do not modify! |
| buffers[idx].bytes = (char*) qd_buffer_base(buffer) + data_offset; |
| buffers[idx].capacity = BUFFER_SIZE; |
| buffers[idx].size = buf_size; |
| buffers[idx].offset = 0; |
| |
| data_offset = 0; |
| payload_len -= buf_size; |
| buffer = DEQ_NEXT(buffer); |
| idx++; |
| } |
| UNLOCK(owning_message->content->lock); |
| |
| return idx; |
| } |
| |
| void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data) |
| { |
| if (!stream_data) |
| return; |
| |
| qd_message_pvt_t *msg = stream_data->owning_message; |
| qd_message_stream_data_t *next = DEQ_HEAD(msg->stream_data_list); |
| qd_message_stream_data_t *current = NULL; |
| while (next && current != stream_data) { |
| current = next; |
| next = DEQ_NEXT(next); |
| qd_message_stream_data_release(current); |
| } |
| } |
| |
| /** |
| * qd_message_stream_data_release |
| * |
| * Decrement the fanout ref-counts for all of the buffers referred to in the stream_data. If any have reached zero, |
| * remove them from the buffer list and free them. |
| * |
| * Do not free buffers that overlap with other stream_data or the buffer pointed to by msg->body_buffer. |
| */ |
| void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) |
| { |
| if (!stream_data) |
| return; |
| |
| qd_message_pvt_t *pvt = stream_data->owning_message; |
| qd_message_content_t *content = pvt->content; |
| qd_buffer_t *buf; |
| |
| // |
| // find the range of buffers that do not overlap other stream_data |
| // or msg->body_buffer |
| // |
| |
| qd_buffer_t *start_buf = stream_data->free_prev ? DEQ_PREV(stream_data->section.buffer) : stream_data->section.buffer; |
| if (DEQ_PREV(stream_data) && DEQ_PREV(stream_data)->last_buffer == start_buf) { |
| // overlap previous stream_data |
| if (start_buf == stream_data->last_buffer) { |
| // no buffers to free |
| DEQ_REMOVE(pvt->stream_data_list, stream_data); |
| free_qd_message_stream_data_t(stream_data); |
| return; |
| } |
| start_buf = DEQ_NEXT(start_buf); |
| } |
| |
| qd_buffer_t *stop_buf; |
| if (stream_data->last_buffer == pvt->body_buffer |
| || (DEQ_NEXT(stream_data) && DEQ_NEXT(stream_data)->section.buffer == stream_data->last_buffer)) { |
| stop_buf = stream_data->last_buffer; |
| } else { |
| stop_buf = DEQ_NEXT(stream_data->last_buffer); |
| } |
| |
| LOCK(content->lock); |
| |
| bool was_blocked = !_Q2_holdoff_should_unblock_LH(content); |
| qd_message_q2_unblocker_t q2_unblock = {0}; |
| |
| if (pvt->is_fanout) { |
| buf = start_buf; |
| while (buf != stop_buf) { |
| uint32_t old = qd_buffer_dec_fanout(buf); |
| (void)old; // avoid compiler unused var error |
| assert(old > 0); |
| buf = DEQ_NEXT(buf); |
| } |
| } |
| |
| // |
| // Free non-overlapping buffers with zero refcounts. |
| // |
| buf = start_buf; |
| while (buf != stop_buf) { |
| qd_buffer_t *next = DEQ_NEXT(buf); |
| if (qd_buffer_get_fanout(buf) == 0) { |
| DEQ_REMOVE(content->buffers, buf); |
| qd_buffer_free(buf); |
| } |
| buf = next; |
| } |
| |
| // |
| // it is possible that we've freed enough buffers to clear Q2 holdoff |
| // |
| if (content->q2_input_holdoff |
| && was_blocked |
| && _Q2_holdoff_should_unblock_LH(content)) { |
| content->q2_input_holdoff = false; |
| q2_unblock = content->q2_unblocker; |
| } |
| |
| UNLOCK(content->lock); |
| |
| DEQ_REMOVE(pvt->stream_data_list, stream_data); |
| free_qd_message_stream_data_t(stream_data); |
| |
| if (q2_unblock.handler) |
| q2_unblock.handler(q2_unblock.context); |
| } |
| |
| |
| qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg, qd_message_stream_data_t **out_stream_data) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| qd_message_content_t *content = msg->content; |
| qd_message_stream_data_t *stream_data = 0; |
| |
| *out_stream_data = 0; |
| if (!msg->body_cursor) { |
| // |
| // We haven't returned a body-data record for this message yet. Start |
| // by ensuring the message has been parsed up to the first body section |
| // |
| |
| qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY); |
| if (status == QD_MESSAGE_DEPTH_OK) { |
| // Even if DEPTH_OK, body is optional. If there is no body then move to |
| // the footer |
| if (msg->content->section_body.buffer) { |
| msg->body_buffer = msg->content->section_body.buffer; |
| msg->body_cursor = qd_buffer_base(msg->body_buffer) + msg->content->section_body.offset; |
| } else { |
| // No body. Look for footer |
| status = qd_message_check_depth(in_msg, QD_DEPTH_ALL); |
| if (status == QD_MESSAGE_DEPTH_OK) { |
| if (msg->content->section_footer.buffer) { |
| // footer is also optional |
| msg->body_buffer = msg->content->section_footer.buffer; |
| msg->body_cursor = qd_buffer_base(msg->body_buffer) + msg->content->section_footer.offset; |
| } |
| } |
| } |
| } |
| |
| if (status == QD_MESSAGE_DEPTH_INCOMPLETE) |
| return QD_MESSAGE_STREAM_DATA_INCOMPLETE; |
| if (status == QD_MESSAGE_DEPTH_INVALID) |
| return QD_MESSAGE_STREAM_DATA_INVALID; |
| |
| // neither data not footer found |
| if (!msg->body_buffer) |
| return QD_MESSAGE_STREAM_DATA_NO_MORE; |
| } |
| |
| // parse out the body data section, or the footer if we're past the |
| // last data section |
| |
| qd_section_status_t section_status; |
| qd_field_location_t location; |
| ZERO(&location); |
| |
| qd_buffer_t * const old_body_buffer = msg->body_buffer; |
| bool is_footer = false; |
| qd_message_stream_data_result_t result = QD_MESSAGE_STREAM_DATA_NO_MORE; |
| |
| LOCK(content->lock); |
| |
| section_status = message_section_check_LH(content, |
| &msg->body_buffer, &msg->body_cursor, |
| BODY_DATA_SHORT, 3, TAGS_BINARY, |
| &location, |
| true, // allow duplicates |
| false); // do not inc buffer fanout |
| if (section_status == QD_SECTION_NO_MATCH) { |
| is_footer = true; |
| section_status = message_section_check_LH(content, |
| &msg->body_buffer, &msg->body_cursor, |
| FOOTER_SHORT, 3, TAGS_MAP, |
| &location, true, false); |
| } |
| |
| switch (section_status) { |
| case QD_SECTION_INVALID: |
| case QD_SECTION_NO_MATCH: |
| result = QD_MESSAGE_STREAM_DATA_INVALID; |
| break; |
| |
| case QD_SECTION_MATCH: |
| stream_data = new_qd_message_stream_data_t(); |
| ZERO(stream_data); |
| stream_data->owning_message = msg; |
| stream_data->section = location; |
| find_last_buffer_LH(&stream_data->section, &msg->body_cursor, &msg->body_buffer); |
| stream_data->last_buffer = msg->body_buffer; |
| trim_stream_data_headers_LH(stream_data, !is_footer); |
| DEQ_INSERT_TAIL(msg->stream_data_list, stream_data); |
| *out_stream_data = stream_data; |
| |
| // if the buffer pointed to by the old msg->body_buffer could not be |
| // freed when the previous stream_data was released, release it when |
| // this stream_data is released. Do not free it here as it may affect |
| // Q2 threshold, which is checked when the stream_data is released. |
| if (DEQ_HEAD(msg->stream_data_list) == stream_data) |
| if (old_body_buffer == DEQ_PREV(stream_data->section.buffer)) |
| stream_data->free_prev = true; |
| |
| result = is_footer ? QD_MESSAGE_STREAM_DATA_FOOTER_OK : QD_MESSAGE_STREAM_DATA_BODY_OK; |
| break; |
| |
| case QD_SECTION_NEED_MORE: |
| result = IS_ATOMIC_FLAG_SET(&msg->content->receive_complete) ? |
| QD_MESSAGE_STREAM_DATA_NO_MORE : QD_MESSAGE_STREAM_DATA_INCOMPLETE; |
| break; |
| } |
| |
| UNLOCK(content->lock); |
| return result; |
| } |
| |
| |
| qd_parsed_field_t *qd_message_get_ingress_router(qd_message_t *msg) |
| { |
| return ((qd_message_pvt_t*) msg)->content->ma_pf_ingress; |
| } |
| |
| |
| // used by exchange bindings to erase original ingress node id |
| void qd_message_reset_ingress_router_annotation(qd_message_t *in_msg) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| msg->ma_reset_ingress = true; |
| } |
| |
| |
| void qd_message_disable_ingress_router_annotation(qd_message_t *msg) |
| { |
| ((qd_message_pvt_t*) msg)->ma_filter_ingress = true; |
| } |
| |
| |
| qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg) |
| { |
| return ((qd_message_pvt_t*)msg)->content->ma_pf_to_override; |
| } |
| |
| |
| qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg) |
| { |
| return ((qd_message_pvt_t*) msg)->content->ma_pf_trace; |
| } |
| |
| |
| void qd_message_disable_trace_annotation(qd_message_t *msg) |
| { |
| ((qd_message_pvt_t*) msg)->ma_filter_trace = true; |
| } |
| |
| |
| // used by exchange bindings to erase old message trace list |
| void qd_message_reset_trace_annotation(qd_message_t *in_msg) |
| { |
| qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; |
| msg->ma_reset_trace = true; |
| } |
| |
| |
| int qd_message_is_streaming(const qd_message_t *msg) |
| { |
| const qd_message_pvt_t *msg_pvt = (const qd_message_pvt_t *)msg; |
| return msg_pvt->ma_streaming; |
| } |
| |
| |
| void qd_message_Q2_holdoff_disable(qd_message_t *msg) |
| { |
| if (!msg) |
| return; |
| qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg; |
| qd_message_content_t *content = msg_pvt->content; |
| qd_message_q2_unblocker_t q2_unblock = {0}; |
| |
| LOCK(content->lock); |
| if (!msg_pvt->content->disable_q2_holdoff) { |
| msg_pvt->content->disable_q2_holdoff = true; |
| if (content->q2_input_holdoff) { |
| content->q2_input_holdoff = false; |
| q2_unblock = content->q2_unblocker; |
| } |
| } |
| UNLOCK(content->lock); |
| |
| if (q2_unblock.handler) |
| q2_unblock.handler(q2_unblock.context); |
| } |
| |
| |
| bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content) |
| { |
| const size_t buff_ct = DEQ_SIZE(content->buffers); |
| assert(buff_ct >= content->protected_buffers); |
| return !content->disable_q2_holdoff && (buff_ct - content->protected_buffers) >= QD_QLIMIT_Q2_UPPER; |
| } |
| |
| |
| bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content) |
| { |
| const size_t buff_ct = DEQ_SIZE(content->buffers); |
| assert(buff_ct >= content->protected_buffers); |
| return content->disable_q2_holdoff || (buff_ct - content->protected_buffers) < QD_QLIMIT_Q2_LOWER; |
| } |
| |
| |
| bool qd_message_is_Q2_blocked(const qd_message_t *msg) |
| { |
| qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg; |
| qd_message_content_t *content = msg_pvt->content; |
| |
| bool blocked; |
| LOCK(content->lock); |
| blocked = content->q2_input_holdoff; |
| UNLOCK(content->lock); |
| return blocked; |
| } |
| |
| |
| bool qd_message_aborted(const qd_message_t *msg) |
| { |
| assert(msg); |
| qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; |
| return IS_ATOMIC_FLAG_SET(&msg_pvt->content->aborted); |
| } |
| |
| void qd_message_set_aborted(const qd_message_t *msg) |
| { |
| if (!msg) |
| return; |
| qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; |
| SET_ATOMIC_FLAG(&msg_pvt->content->aborted); |
| } |
| |
| |
| bool qd_message_oversize(const qd_message_t *msg) |
| { |
| qd_message_content_t * mc = MSG_CONTENT(msg); |
| return IS_ATOMIC_FLAG_SET(&mc->oversize); |
| } |
| |
| |
| int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props) |
| { |
| qd_composed_field_t *field = 0; |
| int rc = 0; |
| |
| field = qd_compose(QD_PERFORMATIVE_FOOTER, field); |
| |
| // Stick the buffers into the footer compose field. |
| qd_compose_insert_binary_buffers(field, footer_props); |
| |
| rc = qd_message_extend(message, field, 0); |
| |
| qd_compose_free(field); |
| return rc; |
| |
| } |
| |
| int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked) |
| { |
| unsigned int length = DEQ_SIZE(*data); |
| |
| qd_composed_field_t *field = 0; |
| int rc = 0; |
| |
| if (q2_blocked) |
| *q2_blocked = false; |
| |
| // DISPATCH-1803: ensure no body data section can exceed the |
| // QD_QLIMIT_Q2_LOWER. This allows the egress router to wait for an entire |
| // body data section to arrive and be validated before sending it out to |
| // the endpoint without preventing Q2 from being relieved (DISPATCH-2191). |
| // |
| const size_t buf_limit = QD_QLIMIT_Q2_LOWER - 2; // reserve 1 extra for performative header |
| assert(buf_limit); |
| while (length > buf_limit) { |
| qd_buffer_t *buf = DEQ_HEAD(*data); |
| for (int i = 0; i < buf_limit; ++i) { |
| buf = DEQ_NEXT(buf); |
| } |
| |
| // split the list at buf. buf becomes head of trailing list |
| |
| qd_buffer_list_t trailer = DEQ_EMPTY; |
| DEQ_HEAD(trailer) = buf; |
| DEQ_TAIL(trailer) = DEQ_TAIL(*data); |
| DEQ_TAIL(*data) = DEQ_PREV(buf); |
| DEQ_NEXT(DEQ_TAIL(*data)) = 0; |
| DEQ_PREV(buf) = 0; |
| DEQ_SIZE(trailer) = length - buf_limit; |
| DEQ_SIZE(*data) = buf_limit; |
| |
| field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); |
| qd_compose_insert_binary_buffers(field, data); |
| |
| DEQ_MOVE(trailer, *data); |
| length -= buf_limit; |
| } |
| |
| field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); |
| qd_compose_insert_binary_buffers(field, data); |
| |
| rc = qd_message_extend(message, field, q2_blocked); |
| qd_compose_free(field); |
| return rc; |
| } |
| |
| |
| void qd_message_set_q2_unblocked_handler(qd_message_t *msg, |
| qd_message_q2_unblocked_handler_t callback, |
| qd_alloc_safe_ptr_t context) |
| { |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| |
| LOCK(content->lock); |
| |
| content->q2_unblocker.handler = callback; |
| content->q2_unblocker.context = context; |
| |
| UNLOCK(content->lock); |
| } |
| |
| |
| void qd_message_clear_q2_unblocked_handler(qd_message_t *msg) |
| { |
| if (msg) { |
| qd_message_content_t *content = MSG_CONTENT(msg); |
| |
| LOCK(content->lock); |
| |
| content->q2_unblocker.handler = 0; |
| qd_nullify_safe_ptr(&content->q2_unblocker.context); |
| |
| UNLOCK(content->lock); |
| } |
| } |