| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <qpid/dispatch/ctools.h> |
| #include <qpid/dispatch/threading.h> |
| #include "message_private.h" |
| #include <string.h> |
| #include <stdio.h> |
| |
| ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); |
| ALLOC_DEFINE(dx_message_content_t); |
| |
| |
| static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) |
| { |
| unsigned char *local_cursor = *cursor; |
| dx_buffer_t *local_buffer = *buffer; |
| |
| int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); |
| while (consume > 0) { |
| if (consume < remaining) { |
| local_cursor += consume; |
| consume = 0; |
| } else { |
| consume -= remaining; |
| local_buffer = local_buffer->next; |
| if (local_buffer == 0){ |
| local_cursor = 0; |
| break; |
| } |
| local_cursor = dx_buffer_base(local_buffer); |
| remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); |
| } |
| } |
| |
| *cursor = local_cursor; |
| *buffer = local_buffer; |
| } |
| |
| |
| static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) |
| { |
| unsigned char result = **cursor; |
| advance(cursor, buffer, 1); |
| return result; |
| } |
| |
| |
| static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field) |
| { |
| unsigned char tag = next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| int consume = 0; |
| switch (tag & 0xF0) { |
| case 0x40 : consume = 0; break; |
| case 0x50 : consume = 1; break; |
| case 0x60 : consume = 2; break; |
| case 0x70 : consume = 4; break; |
| case 0x80 : consume = 8; break; |
| case 0x90 : consume = 16; break; |
| |
| case 0xB0 : |
| case 0xD0 : |
| case 0xF0 : |
| consume |= ((int) next_octet(cursor, buffer)) << 24; |
| if (!(*cursor)) return 0; |
| consume |= ((int) next_octet(cursor, buffer)) << 16; |
| if (!(*cursor)) return 0; |
| consume |= ((int) next_octet(cursor, buffer)) << 8; |
| if (!(*cursor)) return 0; |
| // Fall through to the next case... |
| |
| case 0xA0 : |
| case 0xC0 : |
| case 0xE0 : |
| consume |= (int) next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| break; |
| } |
| |
| if (field) { |
| field->buffer = *buffer; |
| field->offset = *cursor - dx_buffer_base(*buffer); |
| field->length = consume; |
| field->parsed = 1; |
| } |
| |
| advance(cursor, buffer, consume); |
| return 1; |
| } |
| |
| |
| static int start_list(unsigned char **cursor, dx_buffer_t **buffer) |
| { |
| unsigned char tag = next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| int length = 0; |
| int count = 0; |
| |
| switch (tag) { |
| case 0x45 : // list0 |
| break; |
| case 0xd0 : // list32 |
| length |= ((int) next_octet(cursor, buffer)) << 24; |
| if (!(*cursor)) return 0; |
| length |= ((int) next_octet(cursor, buffer)) << 16; |
| if (!(*cursor)) return 0; |
| length |= ((int) next_octet(cursor, buffer)) << 8; |
| if (!(*cursor)) return 0; |
| length |= (int) next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| |
| count |= ((int) next_octet(cursor, buffer)) << 24; |
| if (!(*cursor)) return 0; |
| count |= ((int) next_octet(cursor, buffer)) << 16; |
| if (!(*cursor)) return 0; |
| count |= ((int) next_octet(cursor, buffer)) << 8; |
| if (!(*cursor)) return 0; |
| count |= (int) next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| |
| break; |
| |
| case 0xc0 : // list8 |
| length |= (int) next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| |
| count |= (int) next_octet(cursor, buffer); |
| if (!(*cursor)) return 0; |
| break; |
| } |
| |
| return count; |
| } |
| |
| |
| // |
| // Check the buffer chain, starting at cursor to see if it matches the pattern. |
| // If the pattern matches, check the next tag to see if it's in the set of expected |
| // tags. If not, return zero. If so, set the location descriptor to the good |
| // tag and advance the cursor (and buffer, if needed) to the end of the matched section. |
| // |
| // If there is no match, don't advance the cursor. |
| // |
| // Return 0 if the pattern matches but the following tag is unexpected |
| // Return 0 if the pattern matches and the location already has a pointer (duplicate section) |
| // Return 1 if the pattern matches and we've advanced the cursor/buffer |
| // Return 1 if the pattern does not match |
| // |
| static int dx_check_and_advance(dx_buffer_t **buffer, |
| unsigned char **cursor, |
| unsigned char *pattern, |
| int pattern_length, |
| unsigned char *expected_tags, |
| dx_field_location_t *location) |
| { |
| dx_buffer_t *test_buffer = *buffer; |
| unsigned char *test_cursor = *cursor; |
| |
| if (!test_cursor) |
| return 1; // no match |
| |
| unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer); |
| int idx = 0; |
| |
| while (idx < pattern_length && *test_cursor == pattern[idx]) { |
| idx++; |
| test_cursor++; |
| if (test_cursor == end_of_buffer) { |
| test_buffer = test_buffer->next; |
| if (test_buffer == 0) |
| return 1; // Pattern didn't match |
| test_cursor = dx_buffer_base(test_buffer); |
| end_of_buffer = test_cursor + dx_buffer_size(test_buffer); |
| } |
| } |
| |
| if (idx < pattern_length) |
| return 1; // Pattern didn't match |
| |
| // |
| // Pattern matched, check the tag |
| // |
| while (*expected_tags && *test_cursor != *expected_tags) |
| expected_tags++; |
| if (*expected_tags == 0) |
| return 0; // Unexpected tag |
| |
| if (location->parsed) |
| return 0; // Duplicate section |
| |
| // |
| // Pattern matched and tag is expected. Mark the beginning of the section. |
| // |
| location->parsed = 1; |
| location->buffer = test_buffer; |
| location->offset = test_cursor - dx_buffer_base(test_buffer); |
| location->length = 0; |
| |
| // |
| // Advance the pointers to consume the whole section. |
| // |
| int consume = 0; |
| unsigned char tag = next_octet(&test_cursor, &test_buffer); |
| if (!test_cursor) return 0; |
| switch (tag) { |
| case 0x45 : // list0 |
| break; |
| |
| case 0xd0 : // list32 |
| case 0xd1 : // map32 |
| case 0xb0 : // vbin32 |
| consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; |
| if (!test_cursor) return 0; |
| consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; |
| if (!test_cursor) return 0; |
| consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8; |
| if (!test_cursor) return 0; |
| // Fall through to the next case... |
| |
| case 0xc0 : // list8 |
| case 0xc1 : // map8 |
| case 0xa0 : // vbin8 |
| consume |= (int) next_octet(&test_cursor, &test_buffer); |
| if (!test_cursor) return 0; |
| break; |
| } |
| |
| if (consume) |
| advance(&test_cursor, &test_buffer, consume); |
| |
| *cursor = test_cursor; |
| *buffer = test_buffer; |
| return 1; |
| } |
| |
| |
| static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len) |
| { |
| dx_buffer_t *buf = DEQ_TAIL(msg->buffers); |
| |
| while (len > 0) { |
| if (buf == 0 || dx_buffer_capacity(buf) == 0) { |
| buf = dx_allocate_buffer(); |
| if (buf == 0) |
| return; |
| DEQ_INSERT_TAIL(msg->buffers, buf); |
| } |
| |
| size_t to_copy = dx_buffer_capacity(buf); |
| if (to_copy > len) |
| to_copy = len; |
| memcpy(dx_buffer_cursor(buf), seq, to_copy); |
| dx_buffer_insert(buf, to_copy); |
| len -= to_copy; |
| seq += to_copy; |
| msg->length += to_copy; |
| } |
| } |
| |
| |
| static void dx_insert_8(dx_message_content_t *msg, uint8_t value) |
| { |
| dx_insert(msg, &value, 1); |
| } |
| |
| |
| static void dx_insert_32(dx_message_content_t *msg, uint32_t value) |
| { |
| uint8_t buf[4]; |
| buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); |
| buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); |
| buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); |
| buf[3] = (uint8_t) (value & 0x000000FF); |
| dx_insert(msg, buf, 4); |
| } |
| |
| |
| static void dx_insert_64(dx_message_content_t *msg, uint64_t value) |
| { |
| uint8_t buf[8]; |
| buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); |
| buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); |
| buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); |
| buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); |
| buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); |
| buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); |
| buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); |
| buf[7] = (uint8_t) (value & 0x00000000000000FFL); |
| dx_insert(msg, buf, 8); |
| } |
| |
| |
| static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) |
| { |
| while (*buf) { |
| if (*cursor >= dx_buffer_size(*buf)) { |
| *buf = (*buf)->next; |
| *cursor = 0; |
| } else { |
| dx_buffer_base(*buf)[*cursor] = value; |
| (*cursor)++; |
| return; |
| } |
| } |
| } |
| |
| |
| static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) |
| { |
| dx_buffer_t *buf = field->buffer; |
| size_t cursor = field->offset; |
| |
| dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); |
| dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); |
| dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); |
| dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); |
| } |
| |
| |
| static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code) |
| { |
| // |
| // Insert the short-form performative tag |
| // |
| dx_insert(msg, (const uint8_t*) "\x00\x53", 2); |
| dx_insert_8(msg, code); |
| |
| // |
| // Open the list with a list32 tag |
| // |
| dx_insert_8(msg, 0xd0); |
| |
| // |
| // Mark the current location to later overwrite the length |
| // |
| msg->compose_length.buffer = DEQ_TAIL(msg->buffers); |
| msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer); |
| msg->compose_length.length = 4; |
| msg->compose_length.parsed = 1; |
| |
| dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); |
| |
| // |
| // Mark the current location to later overwrite the count |
| // |
| msg->compose_count.buffer = DEQ_TAIL(msg->buffers); |
| msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer); |
| msg->compose_count.length = 4; |
| msg->compose_count.parsed = 1; |
| |
| dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); |
| |
| msg->length = 4; // Include the length of the count field |
| msg->count = 0; |
| } |
| |
| |
| static void dx_end_list(dx_message_content_t *msg) |
| { |
| dx_overwrite_32(&msg->compose_length, msg->length); |
| dx_overwrite_32(&msg->compose_count, msg->count); |
| } |
| |
| |
| static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| |
| switch (field) { |
| case DX_FIELD_TO: |
| while (1) { |
| if (content->field_to.parsed) |
| return &content->field_to; |
| |
| if (content->section_message_properties.parsed == 0) |
| break; |
| |
| dx_buffer_t *buffer = content->section_message_properties.buffer; |
| unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; |
| |
| int count = start_list(&cursor, &buffer); |
| int result; |
| |
| if (count < 3) |
| break; |
| |
| result = traverse_field(&cursor, &buffer, 0); // message_id |
| if (!result) return 0; |
| result = traverse_field(&cursor, &buffer, 0); // user_id |
| if (!result) return 0; |
| result = traverse_field(&cursor, &buffer, &content->field_to); // to |
| if (!result) return 0; |
| } |
| break; |
| |
| case DX_FIELD_BODY: |
| while (1) { |
| if (content->body.parsed) |
| return &content->body; |
| |
| if (content->section_body.parsed == 0) |
| break; |
| |
| dx_buffer_t *buffer = content->section_body.buffer; |
| unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset; |
| int result; |
| |
| result = traverse_field(&cursor, &buffer, &content->body); |
| if (!result) return 0; |
| } |
| break; |
| |
| default: |
| break; |
| } |
| |
| return 0; |
| } |
| |
| |
| dx_message_t *dx_allocate_message() |
| { |
| dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t(); |
| if (!msg) |
| return 0; |
| |
| DEQ_ITEM_INIT(msg); |
| msg->content = new_dx_message_content_t(); |
| msg->out_delivery = 0; |
| |
| if (msg->content == 0) { |
| free_dx_message_t((dx_message_t*) msg); |
| return 0; |
| } |
| |
| memset(msg->content, 0, sizeof(dx_message_content_t)); |
| msg->content->lock = sys_mutex(); |
| msg->content->ref_count = 1; |
| |
| return (dx_message_t*) msg; |
| } |
| |
| |
| void dx_free_message(dx_message_t *in_msg) |
| { |
| uint32_t rc; |
| dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; |
| dx_message_content_t *content = msg->content; |
| |
| sys_mutex_lock(content->lock); |
| rc = --content->ref_count; |
| sys_mutex_unlock(content->lock); |
| |
| if (rc == 0) { |
| dx_buffer_t *buf = DEQ_HEAD(content->buffers); |
| |
| while (buf) { |
| DEQ_REMOVE_HEAD(content->buffers); |
| dx_free_buffer(buf); |
| buf = DEQ_HEAD(content->buffers); |
| } |
| |
| sys_mutex_free(content->lock); |
| free_dx_message_content_t(content); |
| } |
| |
| free_dx_message_t((dx_message_t*) msg); |
| } |
| |
| |
| dx_message_t *dx_message_copy(dx_message_t *in_msg) |
| { |
| dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; |
| dx_message_content_t *content = msg->content; |
| dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t(); |
| |
| if (!copy) |
| return 0; |
| |
| DEQ_ITEM_INIT(copy); |
| copy->content = content; |
| copy->out_delivery = 0; |
| |
| sys_mutex_lock(content->lock); |
| content->ref_count++; |
| sys_mutex_unlock(content->lock); |
| |
| return (dx_message_t*) copy; |
| } |
| |
| |
| void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery) |
| { |
| ((dx_message_pvt_t*) msg)->out_delivery = delivery; |
| } |
| |
| |
| pn_delivery_t *dx_message_out_delivery(dx_message_t *msg) |
| { |
| return ((dx_message_pvt_t*) msg)->out_delivery; |
| } |
| |
| |
| void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| content->in_delivery = delivery; |
| } |
| |
| |
| pn_delivery_t *dx_message_in_delivery(dx_message_t *msg) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| return content->in_delivery; |
| } |
| |
| |
| dx_message_t *dx_message_receive(pn_delivery_t *delivery) |
| { |
| pn_link_t *link = pn_delivery_link(delivery); |
| dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery); |
| ssize_t rc; |
| dx_buffer_t *buf; |
| |
| // |
| // If there is no message associated with the delivery, this is the first time |
| // we've received anything on this delivery. Allocate a message descriptor and |
| // link it and the delivery together. |
| // |
| if (!msg) { |
| msg = (dx_message_pvt_t*) dx_allocate_message(); |
| pn_delivery_set_context(delivery, (void*) msg); |
| |
| // |
| // Record the incoming delivery only if it is not settled. If it is |
| // settled, it should not be recorded as no future operations on it are |
| // permitted. |
| // |
| if (!pn_delivery_settled(delivery)) |
| msg->content->in_delivery = delivery; |
| } |
| |
| // |
| // Get a reference to the tail buffer on the message. This is the buffer into which |
| // we will store incoming message data. If there is no buffer in the message, allocate |
| // an empty one and add it to the message. |
| // |
| buf = DEQ_TAIL(msg->content->buffers); |
| if (!buf) { |
| buf = dx_allocate_buffer(); |
| DEQ_INSERT_TAIL(msg->content->buffers, buf); |
| } |
| |
| while (1) { |
| // |
| // Try to receive enough data to fill the remaining space in the tail buffer. |
| // |
| rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf)); |
| |
| // |
| // If we receive PN_EOS, we have come to the end of the message. |
| // |
| if (rc == PN_EOS) { |
| // |
| // If the last buffer in the list is empty, remove it and free it. This |
| // will only happen if the size of the message content is an exact multiple |
| // of the buffer size. |
| // |
| if (dx_buffer_size(buf) == 0) { |
| DEQ_REMOVE_TAIL(msg->content->buffers); |
| dx_free_buffer(buf); |
| } |
| return (dx_message_t*) msg; |
| } |
| |
| if (rc > 0) { |
| // |
| // We have received a positive number of bytes for the message. Advance |
| // the cursor in the buffer. |
| // |
| dx_buffer_insert(buf, rc); |
| |
| // |
| // If the buffer is full, allocate a new empty buffer and append it to the |
| // tail of the message's list. |
| // |
| if (dx_buffer_capacity(buf) == 0) { |
| buf = dx_allocate_buffer(); |
| DEQ_INSERT_TAIL(msg->content->buffers, buf); |
| } |
| } else |
| // |
| // We received zero bytes, and no PN_EOS. This means that we've received |
| // all of the data available up to this point, but it does not constitute |
| // the entire message. We'll be back later to finish it up. |
| // |
| break; |
| } |
| |
| return 0; |
| } |
| |
| |
| void dx_message_send(dx_message_t *in_msg, pn_link_t *link) |
| { |
| dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; |
| dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); |
| |
| // TODO - Handle cases where annotations have been added or modified |
| while (buf) { |
| pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); |
| buf = DEQ_NEXT(buf); |
| } |
| } |
| |
| |
| int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth) |
| { |
| |
| #define LONG 10 |
| #define SHORT 3 |
| #define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70" |
| #define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70" |
| #define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71" |
| #define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71" |
| #define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72" |
| #define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72" |
| #define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73" |
| #define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73" |
| #define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74" |
| #define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74" |
| #define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75" |
| #define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75" |
| #define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76" |
| #define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76" |
| #define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78" |
| #define FOOTER_SHORT (unsigned char*) "\x00\x53\x78" |
| #define TAGS_LIST (unsigned char*) "\x45\xc0\xd0" |
| #define TAGS_MAP (unsigned char*) "\xc1\xd1" |
| #define TAGS_BINARY (unsigned char*) "\xa0\xb0" |
| |
| dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; |
| dx_message_content_t *content = msg->content; |
| dx_buffer_t *buffer = DEQ_HEAD(content->buffers); |
| unsigned char *cursor; |
| |
| if (!buffer) |
| return 0; // Invalid - No data in the message |
| |
| if (depth == DX_DEPTH_NONE) |
| return 1; |
| |
| cursor = dx_buffer_base(buffer); |
| |
| // |
| // MESSAGE HEADER |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) |
| return 0; |
| |
| if (depth == DX_DEPTH_HEADER) |
| return 1; |
| |
| // |
| // DELIVERY ANNOTATION |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) |
| return 0; |
| |
| if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) |
| return 1; |
| |
| // |
| // MESSAGE ANNOTATION |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) |
| return 0; |
| |
| if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) |
| return 1; |
| |
| // |
| // PROPERTIES |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) |
| return 0; |
| |
| if (depth == DX_DEPTH_PROPERTIES) |
| return 1; |
| |
| // |
| // APPLICATION PROPERTIES |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) |
| return 0; |
| |
| if (depth == DX_DEPTH_APPLICATION_PROPERTIES) |
| return 1; |
| |
| // |
| // BODY (Note that this function expects a single data section or a single AMQP sequence) |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) |
| return 0; |
| |
| if (depth == DX_DEPTH_BODY) |
| return 1; |
| |
| // |
| // FOOTER |
| // |
| if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) |
| return 0; |
| if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) |
| return 0; |
| |
| return 1; |
| } |
| |
| |
| dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field) |
| { |
| dx_field_location_t *loc = dx_message_field_location(msg, field); |
| if (!loc) |
| return 0; |
| |
| return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); |
| } |
| |
| |
| dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field) |
| { |
| dx_field_location_t *loc = dx_message_field_location(msg, field); |
| if (!loc) |
| return 0; |
| |
| // |
| // Count the number of buffers this field straddles |
| // |
| int bufcnt = 1; |
| dx_buffer_t *buf = loc->buffer; |
| size_t bufsize = dx_buffer_size(buf) - loc->offset; |
| ssize_t remaining = loc->length - bufsize; |
| |
| while (remaining > 0) { |
| bufcnt++; |
| buf = buf->next; |
| if (!buf) |
| return 0; |
| remaining -= dx_buffer_size(buf); |
| } |
| |
| // |
| // Allocate an iovec object big enough to hold the number of buffers |
| // |
| dx_iovec_t *iov = dx_iovec(bufcnt); |
| if (!iov) |
| return 0; |
| |
| // |
| // Build out the io vectors with pointers to the segments of the field in buffers |
| // |
| bufcnt = 0; |
| buf = loc->buffer; |
| bufsize = dx_buffer_size(buf) - loc->offset; |
| void *base = dx_buffer_base(buf) + loc->offset; |
| remaining = loc->length; |
| |
| while (remaining > 0) { |
| dx_iovec_array(iov)[bufcnt].iov_base = base; |
| dx_iovec_array(iov)[bufcnt].iov_len = bufsize; |
| bufcnt++; |
| remaining -= bufsize; |
| if (remaining > 0) { |
| buf = buf->next; |
| base = dx_buffer_base(buf); |
| bufsize = dx_buffer_size(buf); |
| if (bufsize > remaining) |
| bufsize = remaining; |
| } |
| } |
| |
| return iov; |
| } |
| |
| |
| void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) |
| { |
| dx_message_begin_header(msg); |
| dx_message_insert_boolean(msg, 0); // durable |
| //dx_message_insert_null(msg); // priority |
| //dx_message_insert_null(msg); // ttl |
| //dx_message_insert_boolean(msg, 0); // first-acquirer |
| //dx_message_insert_uint(msg, 0); // delivery-count |
| dx_message_end_header(msg); |
| |
| dx_message_begin_message_properties(msg); |
| dx_message_insert_null(msg); // message-id |
| dx_message_insert_null(msg); // user-id |
| dx_message_insert_string(msg, to); // to |
| //dx_message_insert_null(msg); // subject |
| //dx_message_insert_null(msg); // reply-to |
| //dx_message_insert_null(msg); // correlation-id |
| //dx_message_insert_null(msg); // content-type |
| //dx_message_insert_null(msg); // content-encoding |
| //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time |
| //dx_message_insert_timestamp(msg, 0); // creation-time |
| //dx_message_insert_null(msg); // group-id |
| //dx_message_insert_uint(msg, 0); // group-sequence |
| //dx_message_insert_null(msg); // reply-to-group-id |
| dx_message_end_message_properties(msg); |
| |
| if (buffers) |
| dx_message_append_body_data(msg, buffers); |
| } |
| |
| |
| void dx_message_begin_header(dx_message_t *msg) |
| { |
| dx_start_list_performative(MSG_CONTENT(msg), 0x70); |
| } |
| |
| |
| void dx_message_end_header(dx_message_t *msg) |
| { |
| dx_end_list(MSG_CONTENT(msg)); |
| } |
| |
| |
| void dx_message_begin_delivery_annotations(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_end_delivery_annotations(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_begin_message_annotations(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_end_message_annotations(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_begin_message_properties(dx_message_t *msg) |
| { |
| dx_start_list_performative(MSG_CONTENT(msg), 0x73); |
| } |
| |
| |
| void dx_message_end_message_properties(dx_message_t *msg) |
| { |
| dx_end_list(MSG_CONTENT(msg)); |
| } |
| |
| |
| void dx_message_begin_application_properties(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_end_application_properties(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| dx_buffer_t *buf = DEQ_HEAD(*buffers); |
| uint32_t len = 0; |
| |
| // |
| // Calculate the size of the body to be appended. |
| // |
| while (buf) { |
| len += dx_buffer_size(buf); |
| buf = DEQ_NEXT(buf); |
| } |
| |
| // |
| // Insert a DATA section performative header. |
| // |
| dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); |
| if (len < 256) { |
| dx_insert_8(content, 0xa0); // vbin8 |
| dx_insert_8(content, (uint8_t) len); |
| } else { |
| dx_insert_8(content, 0xb0); // vbin32 |
| dx_insert_32(content, len); |
| } |
| |
| // |
| // Move the supplied buffers to the tail of the message's buffer list. |
| // |
| buf = DEQ_HEAD(*buffers); |
| while (buf) { |
| DEQ_REMOVE_HEAD(*buffers); |
| DEQ_INSERT_TAIL(content->buffers, buf); |
| buf = DEQ_HEAD(*buffers); |
| } |
| } |
| |
| |
| void dx_message_begin_body_sequence(dx_message_t *msg) |
| { |
| } |
| |
| |
| void dx_message_end_body_sequence(dx_message_t *msg) |
| { |
| } |
| |
| |
| void dx_message_begin_footer(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_end_footer(dx_message_t *msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_insert_null(dx_message_t *msg) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| dx_insert_8(content, 0x40); |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_boolean(dx_message_t *msg, int value) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| if (value) |
| dx_insert(content, (const uint8_t*) "\x56\x01", 2); |
| else |
| dx_insert(content, (const uint8_t*) "\x56\x00", 2); |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| dx_insert_8(content, 0x50); |
| dx_insert_8(content, value); |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_uint(dx_message_t *msg, uint32_t value) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| if (value == 0) { |
| dx_insert_8(content, 0x43); // uint0 |
| } else if (value < 256) { |
| dx_insert_8(content, 0x52); // smalluint |
| dx_insert_8(content, (uint8_t) value); |
| } else { |
| dx_insert_8(content, 0x70); // uint |
| dx_insert_32(content, value); |
| } |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_ulong(dx_message_t *msg, uint64_t value) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| if (value == 0) { |
| dx_insert_8(content, 0x44); // ulong0 |
| } else if (value < 256) { |
| dx_insert_8(content, 0x53); // smallulong |
| dx_insert_8(content, (uint8_t) value); |
| } else { |
| dx_insert_8(content, 0x80); // ulong |
| dx_insert_64(content, value); |
| } |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| if (len < 256) { |
| dx_insert_8(content, 0xa0); // vbin8 |
| dx_insert_8(content, (uint8_t) len); |
| } else { |
| dx_insert_8(content, 0xb0); // vbin32 |
| dx_insert_32(content, len); |
| } |
| dx_insert(content, start, len); |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_string(dx_message_t *msg, const char *start) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| uint32_t len = strlen(start); |
| |
| if (len < 256) { |
| dx_insert_8(content, 0xa1); // str8-utf8 |
| dx_insert_8(content, (uint8_t) len); |
| dx_insert(content, (const uint8_t*) start, len); |
| } else { |
| dx_insert_8(content, 0xb1); // str32-utf8 |
| dx_insert_32(content, len); |
| dx_insert(content, (const uint8_t*) start, len); |
| } |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| dx_insert_8(content, 0x98); // uuid |
| dx_insert(content, value, 16); |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| if (len < 256) { |
| dx_insert_8(content, 0xa3); // sym8 |
| dx_insert_8(content, (uint8_t) len); |
| dx_insert(content, (const uint8_t*) start, len); |
| } else { |
| dx_insert_8(content, 0xb3); // sym32 |
| dx_insert_32(content, len); |
| dx_insert(content, (const uint8_t*) start, len); |
| } |
| content->count++; |
| } |
| |
| |
| void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value) |
| { |
| dx_message_content_t *content = MSG_CONTENT(msg); |
| dx_insert_8(content, 0x83); // timestamp |
| dx_insert_64(content, value); |
| content->count++; |
| } |
| |
| |
| void dx_message_begin_list(dx_message_t* msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_end_list(dx_message_t* msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_begin_map(dx_message_t* msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |
| |
| void dx_message_end_map(dx_message_t* msg) |
| { |
| assert(0); // Not Implemented |
| } |
| |