blob: c5dc0283e0176dfe65a83dddf2c9686abd10f44f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/error.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
#include "message_private.h"
#include "compose_private.h"
#include "aprintf.h"
#include <string.h>
#include <ctype.h>
#include <stdio.h>
#include <limits.h>
static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0"
"\xa1\xb1\xa3\xb3\xe0\xf0"
"\x40\x56\x41\x42\x50\x60\x70\x52\x43\x80\x53\x44\x51\x61\x71\x54\x81\x55\x72\x82\x74\x84\x94\x73\x83\x98";
ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
ALLOC_DEFINE(qd_message_content_t);
typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
static qd_log_source_t* log_source = 0;
void qd_message_initialize() {
log_source = qd_log_source("MESSAGE");
}
int qd_message_repr_len() { return qd_log_max_len(); }
// Quote non-printable characters suitable for log messages. Output in buffer.
static void quote(char* bytes, int n, char **begin, char *end) {
for (char* p = bytes; p < bytes+n; ++p) {
if (isprint(*p) || isspace(*p))
aprintf(begin, end, "%c", (int)*p);
else
aprintf(begin, end, "\\%02hhx", *p);
}
}
/** Copy a message field for use in log messages. Output in buffer. */
static void copy_field(qd_message_t *msg, int field, int max, char *pre, char *post,
char **begin, char *end)
{
qd_field_iterator_t* iter = qd_message_field_iterator(msg, field);
if (iter) {
aprintf(begin, end, "%s", pre);
qd_field_iterator_reset(iter);
for (int j = 0; !qd_field_iterator_end(iter) && j < max; ++j) {
char byte = qd_field_iterator_octet(iter);
quote(&byte, 1, begin, end);
}
aprintf(begin, end, "%s", post);
qd_field_iterator_free(iter);
}
}
static const char REPR_END[] = "}\0";
/* TODO aconway 2014-05-13: more detailed message representation. */
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len) {
qd_message_check(msg, QD_DEPTH_BODY);
char *begin = buffer;
char *end = buffer + len - sizeof(REPR_END); /* Save space for ending */
aprintf(&begin, end, "Message{", msg);
copy_field(msg, QD_FIELD_TO, INT_MAX, "to='", "'", &begin, end);
copy_field(msg, QD_FIELD_REPLY_TO, INT_MAX, " reply-to='", "'", &begin, end);
copy_field(msg, QD_FIELD_BODY, 16, " body='", "'", &begin, end);
aprintf(&begin, end, "%s", REPR_END); /* We saved space at the beginning. */
return buffer;
}
static void advance(unsigned char **cursor, qd_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
{
unsigned char *local_cursor = *cursor;
qd_buffer_t *local_buffer = *buffer;
int remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer));
while (consume > 0) {
if (consume < remaining) {
if (handler)
handler(context, local_cursor, consume);
local_cursor += consume;
consume = 0;
} else {
if (handler)
handler(context, local_cursor, remaining);
consume -= remaining;
local_buffer = local_buffer->next;
if (local_buffer == 0){
local_cursor = 0;
break;
}
local_cursor = qd_buffer_base(local_buffer);
remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer));
}
}
*cursor = local_cursor;
*buffer = local_buffer;
}
static unsigned char next_octet(unsigned char **cursor, qd_buffer_t **buffer)
{
unsigned char result = **cursor;
advance(cursor, buffer, 1, 0, 0);
return result;
}
static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
{
qd_buffer_t *start_buffer = *buffer;
unsigned char *start_cursor = *cursor;
unsigned char tag = next_octet(cursor, buffer);
if (!(*cursor)) return 0;
int consume = 0;
size_t hdr_length = 1;
switch (tag & 0xF0) {
case 0x40 : consume = 0; break;
case 0x50 : consume = 1; break;
case 0x60 : consume = 2; break;
case 0x70 : consume = 4; break;
case 0x80 : consume = 8; break;
case 0x90 : consume = 16; break;
case 0xB0 :
case 0xD0 :
case 0xF0 :
hdr_length += 3;
consume |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
consume |= ((int) next_octet(cursor, buffer)) << 16;
if (!(*cursor)) return 0;
consume |= ((int) next_octet(cursor, buffer)) << 8;
if (!(*cursor)) return 0;
// Fall through to the next case...
case 0xA0 :
case 0xC0 :
case 0xE0 :
hdr_length++;
consume |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
}
if (field && !field->parsed) {
field->buffer = start_buffer;
field->offset = start_cursor - qd_buffer_base(start_buffer);
field->length = consume;
field->hdr_length = hdr_length;
field->parsed = true;
field->tag = tag;
}
advance(cursor, buffer, consume, 0, 0);
return 1;
}
static int start_list(unsigned char **cursor, qd_buffer_t **buffer)
{
unsigned char tag = next_octet(cursor, buffer);
if (!(*cursor)) return 0;
int length = 0;
int count = 0;
switch (tag) {
case 0x45 : // list0
break;
case 0xd0 : // list32
length |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
length |= ((int) next_octet(cursor, buffer)) << 16;
if (!(*cursor)) return 0;
length |= ((int) next_octet(cursor, buffer)) << 8;
if (!(*cursor)) return 0;
length |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
count |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
count |= ((int) next_octet(cursor, buffer)) << 16;
if (!(*cursor)) return 0;
count |= ((int) next_octet(cursor, buffer)) << 8;
if (!(*cursor)) return 0;
count |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
case 0xc0 : // list8
length |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
count |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
}
return count;
}
//
// Check the buffer chain, starting at cursor to see if it matches the pattern.
// If the pattern matches, check the next tag to see if it's in the set of expected
// tags. If not, return zero. If so, set the location descriptor to the good
// tag and advance the cursor (and buffer, if needed) to the end of the matched section.
//
// If there is no match, don't advance the cursor.
//
// Return 0 if the pattern matches but the following tag is unexpected
// Return 0 if the pattern matches and the location already has a pointer (duplicate section)
// Return 1 if the pattern matches and we've advanced the cursor/buffer
// Return 1 if the pattern does not match
//
static int qd_check_and_advance(qd_buffer_t **buffer,
unsigned char **cursor,
const unsigned char *pattern,
int pattern_length,
const unsigned char *expected_tags,
qd_field_location_t *location)
{
qd_buffer_t *test_buffer = *buffer;
unsigned char *test_cursor = *cursor;
if (!test_cursor)
return 1; // no match
unsigned char *end_of_buffer = qd_buffer_base(test_buffer) + qd_buffer_size(test_buffer);
int idx = 0;
while (idx < pattern_length && *test_cursor == pattern[idx]) {
idx++;
test_cursor++;
if (test_cursor == end_of_buffer) {
test_buffer = test_buffer->next;
if (test_buffer == 0)
return 1; // Pattern didn't match
test_cursor = qd_buffer_base(test_buffer);
end_of_buffer = test_cursor + qd_buffer_size(test_buffer);
}
}
if (idx < pattern_length)
return 1; // Pattern didn't match
//
// Pattern matched, check the tag
//
while (*expected_tags && *test_cursor != *expected_tags)
expected_tags++;
if (*expected_tags == 0)
return 0; // Unexpected tag
if (location->parsed)
return 0; // Duplicate section
//
// Pattern matched and tag is expected. Mark the beginning of the section.
//
location->parsed = 1;
location->buffer = *buffer;
location->offset = *cursor - qd_buffer_base(*buffer);
location->length = 0;
location->hdr_length = pattern_length;
//
// Advance the pointers to consume the whole section.
//
int pre_consume = 1; // Count the already extracted tag
int consume = 0;
unsigned char tag = next_octet(&test_cursor, &test_buffer);
unsigned char tag_subcat = tag & 0xF0;
if (!test_cursor && tag_subcat != 0x40)
return 0;
switch (tag_subcat) {
case 0x40: break;
case 0x50: consume = 1; break;
case 0x60: consume = 2; break;
case 0x70: consume = 4; break;
case 0x80: consume = 8; break;
case 0x90: consume = 16; break;
case 0xB0:
case 0xD0:
case 0xF0:
pre_consume += 3;
consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24;
if (!test_cursor) return 0;
consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16;
if (!test_cursor) return 0;
consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8;
if (!test_cursor) return 0;
// Fall through to the next case...
case 0xA0:
case 0xC0:
case 0xE0:
pre_consume += 1;
consume |= (int) next_octet(&test_cursor, &test_buffer);
if (!test_cursor) return 0;
break;
}
location->length = pre_consume + consume;
if (consume)
advance(&test_cursor, &test_buffer, consume, 0, 0);
*cursor = test_cursor;
*buffer = test_buffer;
return 1;
}
// translate a field into its proper section of the message
static qd_message_field_t qd_field_section(qd_message_field_t field)
{
switch (field) {
case QD_FIELD_HEADER:
case QD_FIELD_DELIVERY_ANNOTATION:
case QD_FIELD_MESSAGE_ANNOTATION:
case QD_FIELD_PROPERTIES:
case QD_FIELD_APPLICATION_PROPERTIES:
case QD_FIELD_BODY:
case QD_FIELD_FOOTER:
return field;
case QD_FIELD_DURABLE:
case QD_FIELD_PRIORITY:
case QD_FIELD_TTL:
case QD_FIELD_FIRST_ACQUIRER:
case QD_FIELD_DELIVERY_COUNT:
return QD_FIELD_HEADER;
case QD_FIELD_MESSAGE_ID:
case QD_FIELD_USER_ID:
case QD_FIELD_TO:
case QD_FIELD_SUBJECT:
case QD_FIELD_REPLY_TO:
case QD_FIELD_CORRELATION_ID:
case QD_FIELD_CONTENT_TYPE:
case QD_FIELD_CONTENT_ENCODING:
case QD_FIELD_ABSOLUTE_EXPIRY_TIME:
case QD_FIELD_CREATION_TIME:
case QD_FIELD_GROUP_ID:
case QD_FIELD_GROUP_SEQUENCE:
case QD_FIELD_REPLY_TO_GROUP_ID:
return QD_FIELD_PROPERTIES;
default:
assert(false); // TBD: add new fields here
return QD_FIELD_NONE;
}
}
// get the field location of a field in the message properties (if it exists,
// else 0).
static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_message_field_t field)
{
static const intptr_t offsets[] = {
// position of the field's qd_field_location_t in the message content
// object
(intptr_t) &((qd_message_content_t *)0)->field_message_id,
(intptr_t) &((qd_message_content_t *)0)->field_user_id,
(intptr_t) &((qd_message_content_t *)0)->field_to,
(intptr_t) &((qd_message_content_t *)0)->field_subject,
(intptr_t) &((qd_message_content_t *)0)->field_reply_to,
(intptr_t) &((qd_message_content_t *)0)->field_correlation_id
};
// update table above if new fields need to be accessed:
assert(QD_FIELD_MESSAGE_ID <= field && field <= QD_FIELD_CORRELATION_ID);
qd_message_content_t *content = MSG_CONTENT(msg);
if (!content->section_message_properties.parsed) {
if (!qd_message_check(msg, QD_DEPTH_PROPERTIES) || !content->section_message_properties.parsed)
return 0;
}
if (field == QD_FIELD_PROPERTIES) return &content->section_message_properties;
const int index = field - QD_FIELD_MESSAGE_ID;
qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
if (location->parsed)
return location;
// requested field not parsed out. Need to parse out up to the requested field:
qd_buffer_t *buffer = content->section_message_properties.buffer;
unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
advance(&cursor, &buffer, content->section_message_properties.hdr_length, 0, 0);
if (index >= start_list(&cursor, &buffer)) return 0; // properties list too short
int position = 0;
while (position < index) {
qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
if (f->parsed)
advance(&cursor, &buffer, f->hdr_length + f->length, 0, 0);
else // parse it out
if (!traverse_field(&cursor, &buffer, f)) return 0;
position++;
}
// all fields previous to the target have now been parsed and cursor/buffer
// are in the correct position, parse out the field:
if (traverse_field(&cursor, &buffer, location))
return location;
return 0;
}
// get the field location of a field in the message header (if it exists,
// else 0)
static qd_field_location_t *qd_message_header_field(qd_message_t *msg, qd_message_field_t field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
if (!content->section_message_header.parsed) {
if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed)
return 0;
}
switch (field) {
case QD_FIELD_HEADER:
return &content->section_message_properties;
default:
// TBD: add header fields as needed (see qd_message_properties_field()
// as an example)
assert(false);
return 0;
}
}
// Get the field's location in the buffer. Return 0 if the field does not exist.
// Note that even if the field location is returned, it may contain a
// QD_AMQP_NULL value (qd_field_location->tag == QD_AMQP_NULL).
//
static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_message_field_t field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_message_field_t section = qd_field_section(field);
switch (section) {
case QD_FIELD_HEADER:
return qd_message_header_field(msg, field);
case QD_FIELD_PROPERTIES:
return qd_message_properties_field(msg, field);
case QD_FIELD_DELIVERY_ANNOTATION:
if (content->section_delivery_annotation.parsed ||
(qd_message_check(msg, QD_DEPTH_DELIVERY_ANNOTATIONS) && content->section_delivery_annotation.parsed))
return &content->section_delivery_annotation;
break;
case QD_FIELD_MESSAGE_ANNOTATION:
if (content->section_message_annotation.parsed ||
(qd_message_check(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) && content->section_message_annotation.parsed))
return &content->section_message_annotation;
break;
case QD_FIELD_APPLICATION_PROPERTIES:
if (content->section_application_properties.parsed ||
(qd_message_check(msg, QD_DEPTH_APPLICATION_PROPERTIES) && content->section_application_properties.parsed))
return &content->section_application_properties;
break;
case QD_FIELD_BODY:
if (content->section_body.parsed ||
(qd_message_check(msg, QD_DEPTH_BODY) && content->section_body.parsed))
return &content->section_body;
break;
case QD_FIELD_FOOTER:
if (content->section_footer.parsed ||
(qd_message_check(msg, QD_DEPTH_ALL) && content->section_footer.parsed))
return &content->section_footer;
break;
default:
assert(false); // TBD: add support as needed
return 0;
}
return 0;
}
qd_message_t *qd_message()
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) new_qd_message_t();
if (!msg)
return 0;
DEQ_ITEM_INIT(msg);
DEQ_INIT(msg->ma_to_override);
DEQ_INIT(msg->ma_trace);
DEQ_INIT(msg->ma_ingress);
msg->ma_phase = 0;
msg->content = new_qd_message_content_t();
if (msg->content == 0) {
free_qd_message_t((qd_message_t*) msg);
return 0;
}
memset(msg->content, 0, sizeof(qd_message_content_t));
msg->content->lock = sys_mutex();
msg->content->ref_count = 1;
msg->content->parse_depth = QD_DEPTH_NONE;
msg->content->parsed_message_annotations = 0;
return (qd_message_t*) msg;
}
void qd_message_free(qd_message_t *in_msg)
{
if (!in_msg) return;
uint32_t rc;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_buffer_list_free_buffers(&msg->ma_to_override);
qd_buffer_list_free_buffers(&msg->ma_trace);
qd_buffer_list_free_buffers(&msg->ma_ingress);
qd_message_content_t *content = msg->content;
sys_mutex_lock(content->lock);
rc = --content->ref_count;
sys_mutex_unlock(content->lock);
if (rc == 0) {
if (content->parsed_message_annotations)
qd_parse_free(content->parsed_message_annotations);
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
DEQ_REMOVE_HEAD(content->buffers);
qd_buffer_free(buf);
buf = DEQ_HEAD(content->buffers);
}
sys_mutex_free(content->lock);
free_qd_message_content_t(content);
}
free_qd_message_t((qd_message_t*) msg);
}
qd_message_t *qd_message_copy(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_message_pvt_t *copy = (qd_message_pvt_t*) new_qd_message_t();
if (!copy)
return 0;
DEQ_ITEM_INIT(copy);
qd_buffer_list_clone(&copy->ma_to_override, &msg->ma_to_override);
qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
copy->ma_phase = msg->ma_phase;
copy->content = content;
sys_mutex_lock(content->lock);
content->ref_count++;
sys_mutex_unlock(content->lock);
return (qd_message_t*) copy;
}
qd_parsed_field_t *qd_message_message_annotations(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
if (content->parsed_message_annotations)
return content->parsed_message_annotations;
qd_field_iterator_t *ma = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
if (ma == 0)
return 0;
content->parsed_message_annotations = qd_parse(ma);
if (content->parsed_message_annotations == 0 ||
!qd_parse_ok(content->parsed_message_annotations) ||
!qd_parse_is_map(content->parsed_message_annotations)) {
qd_field_iterator_free(ma);
qd_parse_free(content->parsed_message_annotations);
content->parsed_message_annotations = 0;
return 0;
}
qd_field_iterator_free(ma);
return content->parsed_message_annotations;
}
void qd_message_set_trace_annotation(qd_message_t *in_msg, qd_composed_field_t *trace_field)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_buffer_list_free_buffers(&msg->ma_trace);
qd_compose_take_buffers(trace_field, &msg->ma_trace);
qd_compose_free(trace_field);
}
void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_field_t *to_field)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_buffer_list_free_buffers(&msg->ma_to_override);
qd_compose_take_buffers(to_field, &msg->ma_to_override);
qd_compose_free(to_field);
}
void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
msg->ma_phase = phase;
}
int qd_message_get_phase_annotation(const qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
return msg->ma_phase;
}
void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_buffer_list_free_buffers(&msg->ma_ingress);
qd_compose_take_buffers(ingress_field, &msg->ma_ingress);
qd_compose_free(ingress_field);
}
qd_message_t *qd_message_receive(pn_delivery_t *delivery)
{
pn_link_t *link = pn_delivery_link(delivery);
ssize_t rc;
qd_buffer_t *buf;
qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_delivery_get_context(delivery);
//
// If there is no message associated with the delivery, this is the first time
// we've received anything on this delivery. Allocate a message descriptor and
// link it and the delivery together.
//
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
pn_delivery_set_context(delivery, (void*) msg);
}
//
// Get a reference to the tail buffer on the message. This is the buffer into which
// we will store incoming message data. If there is no buffer in the message, allocate
// an empty one and add it to the message.
//
buf = DEQ_TAIL(msg->content->buffers);
if (!buf) {
buf = qd_buffer();
DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
while (1) {
//
// Try to receive enough data to fill the remaining space in the tail buffer.
//
rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
//
// If we receive PN_EOS, we have come to the end of the message.
//
if (rc == PN_EOS) {
//
// If the last buffer in the list is empty, remove it and free it. This
// will only happen if the size of the message content is an exact multiple
// of the buffer size.
//
if (qd_buffer_size(buf) == 0) {
DEQ_REMOVE_TAIL(msg->content->buffers);
qd_buffer_free(buf);
}
pn_delivery_set_context(delivery, 0);
char repr[qd_message_repr_len()];
qd_log(log_source, QD_LOG_TRACE, "Received %s on link %s",
qd_message_repr((qd_message_t*)msg, repr, sizeof(repr)),
pn_link_name(link));
return (qd_message_t*) msg;
}
if (rc > 0) {
//
// We have received a positive number of bytes for the message. Advance
// the cursor in the buffer.
//
qd_buffer_insert(buf, rc);
//
// If the buffer is full, allocate a new empty buffer and append it to the
// tail of the message's list.
//
if (qd_buffer_capacity(buf) == 0) {
buf = qd_buffer();
DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
} else
//
// We received zero bytes, and no PN_EOS. This means that we've received
// all of the data available up to this point, but it does not constitute
// the entire message. We'll be back later to finish it up.
//
break;
}
return 0;
}
static void send_handler(void *context, const unsigned char *start, int length)
{
pn_link_t *pnl = (pn_link_t*) context;
pn_link_send(pnl, (const char*) start, length);
}
// create a buffer chain holding the outgoing message annotations section
static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out)
{
if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
!DEQ_IS_EMPTY(msg->ma_trace) ||
!DEQ_IS_EMPTY(msg->ma_ingress)) {
qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
qd_compose_start_map(out_ma);
if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
qd_compose_insert_symbol(out_ma, QD_MA_TO);
qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
}
if (!DEQ_IS_EMPTY(msg->ma_trace)) {
qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
qd_compose_insert_buffers(out_ma, &msg->ma_trace);
}
if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
}
if (msg->ma_phase != 0) {
qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
qd_compose_insert_int(out_ma, msg->ma_phase);
}
qd_compose_end_map(out_ma);
qd_compose_take_buffers(out_ma, out);
qd_compose_free(out_ma);
return true;
}
return false;
}
void qd_message_send(qd_message_t *in_msg,
qd_link_t *link,
bool strip_annotations)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
unsigned char *cursor;
pn_link_t *pnl = qd_link_pn(link);
char repr[qd_message_repr_len()];
qd_log(log_source, QD_LOG_TRACE, "Sending %s on link %s",
qd_message_repr(in_msg, repr, sizeof(repr)),
pn_link_name(pnl));
qd_buffer_list_t new_ma;
DEQ_INIT(new_ma);
if (strip_annotations || compose_message_annotations(msg, &new_ma)) {
//
// This is the case where the message annotations have been modified.
// The message send must be divided into sections: The existing header;
// the new message annotations; the rest of the existing message.
// Note that the original message annotations that are still in the
// buffer chain must not be sent.
//
// Start by making sure that we've parsed the message sections through
// the message annotations
//
// ??? NO LONGER NECESSARY???
if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
return;
}
//
// Send header if present
//
cursor = qd_buffer_base(buf);
if (content->section_message_header.length > 0) {
buf = content->section_message_header.buffer;
cursor = content->section_message_header.offset + qd_buffer_base(buf);
advance(&cursor, &buf,
content->section_message_header.length + content->section_message_header.hdr_length,
send_handler, (void*) pnl);
}
//
// Send new message annotations
//
qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
while (da_buf) {
pn_link_send(pnl, (char*) qd_buffer_base(da_buf), qd_buffer_size(da_buf));
da_buf = DEQ_NEXT(da_buf);
}
qd_buffer_list_free_buffers(&new_ma);
//
// Skip over replaced message annotations
//
if (content->section_message_annotation.length > 0)
advance(&cursor, &buf,
content->section_message_annotation.hdr_length + content->section_message_annotation.length,
0, 0);
//
// Send remaining partial buffer
//
if (buf) {
size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
advance(&cursor, &buf, len, send_handler, (void*) pnl);
}
// Fall through to process the remaining buffers normally
// Note that 'advance' will have moved us to the next buffer in the chain.
}
while (buf) {
pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}
static int qd_check_field_LH(qd_message_content_t *content,
qd_message_depth_t depth,
const unsigned char *long_pattern,
const unsigned char *short_pattern,
const unsigned char *expected_tags,
qd_field_location_t *location,
int more)
{
#define LONG 10
#define SHORT 3
if (depth > content->parse_depth) {
if (0 == qd_check_and_advance(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location))
return 0;
if (0 == qd_check_and_advance(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location))
return 0;
if (!more)
content->parse_depth = depth;
}
return 1;
}
static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
{
qd_error_clear();
qd_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer) {
qd_error(QD_ERROR_MESSAGE, "No data");
return false;
}
if (depth <= content->parse_depth)
return true; // We've already parsed at least this deep
if (content->parse_buffer == 0) {
content->parse_buffer = buffer;
content->parse_cursor = qd_buffer_base(content->parse_buffer);
}
if (depth == QD_DEPTH_NONE)
return true;
//
// MESSAGE HEADER
//
if (0 == qd_check_field_LH(content, QD_DEPTH_HEADER,
MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, &content->section_message_header, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid header");
return false;
}
if (depth == QD_DEPTH_HEADER)
return true;
//
// DELIVERY ANNOTATION
//
if (0 == qd_check_field_LH(content, QD_DEPTH_DELIVERY_ANNOTATIONS,
DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, &content->section_delivery_annotation, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid delivery-annotations");
return false;
}
if (depth == QD_DEPTH_DELIVERY_ANNOTATIONS)
return true;
//
// MESSAGE ANNOTATION
//
if (0 == qd_check_field_LH(content, QD_DEPTH_MESSAGE_ANNOTATIONS,
MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, &content->section_message_annotation, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid annotations");
return false;
}
if (depth == QD_DEPTH_MESSAGE_ANNOTATIONS)
return true;
//
// PROPERTIES
//
if (0 == qd_check_field_LH(content, QD_DEPTH_PROPERTIES,
PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, &content->section_message_properties, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid message properties");
return false;
}
if (depth == QD_DEPTH_PROPERTIES)
return true;
//
// APPLICATION PROPERTIES
//
if (0 == qd_check_field_LH(content, QD_DEPTH_APPLICATION_PROPERTIES,
APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, &content->section_application_properties, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid application-properties");
return false;
}
if (depth == QD_DEPTH_APPLICATION_PROPERTIES)
return true;
//
// BODY
// Note that this function expects a limited set of types in a VALUE section. This is
// not a problem for messages passing through Dispatch because through-only messages won't
// be parsed to BODY-depth.
//
if (0 == qd_check_field_LH(content, QD_DEPTH_BODY,
BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, &content->section_body, 1)) {
qd_error(QD_ERROR_MESSAGE, "Invalid body data");
return false;
}
if (0 == qd_check_field_LH(content, QD_DEPTH_BODY,
BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, &content->section_body, 1)) {
qd_error(QD_ERROR_MESSAGE, "Invalid body sequence");
return false;
}
if (0 == qd_check_field_LH(content, QD_DEPTH_BODY,
BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, &content->section_body, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid body value");
return false;
}
if (depth == QD_DEPTH_BODY)
return true;
//
// FOOTER
//
if (0 == qd_check_field_LH(content, QD_DEPTH_ALL,
FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, &content->section_footer, 0)) {
qd_error(QD_ERROR_MESSAGE, "Invalid footer");
return false;
}
return true;
}
int qd_message_check(qd_message_t *in_msg, qd_message_depth_t depth)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
int result;
sys_mutex_lock(content->lock);
result = qd_message_check_LH(content, depth);
sys_mutex_unlock(content->lock);
return result;
}
qd_field_iterator_t *qd_message_field_iterator_typed(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return 0;
return qd_field_iterator_buffer(loc->buffer, loc->offset, loc->length + loc->hdr_length);
}
qd_field_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return 0;
if (loc->tag == QD_AMQP_NULL)
return 0;
qd_buffer_t *buffer = loc->buffer;
unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset;
advance(&cursor, &buffer, loc->hdr_length, 0, 0);
return qd_field_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length);
}
ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return -1;
return loc->length;
}
ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, void *buffer, size_t *hdr_length)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return -1;
qd_buffer_t *buf = loc->buffer;
size_t bufsize = qd_buffer_size(buf) - loc->offset;
void *base = qd_buffer_base(buf) + loc->offset;
size_t remaining = loc->length + loc->hdr_length;
*hdr_length = loc->hdr_length;
while (remaining > 0) {
if (bufsize > remaining)
bufsize = remaining;
memcpy(buffer, base, bufsize);
buffer += bufsize;
remaining -= bufsize;
if (remaining > 0) {
buf = buf->next;
base = qd_buffer_base(buf);
bufsize = qd_buffer_size(buf);
}
}
return loc->length + loc->hdr_length;
}
void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers)
{
qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
qd_message_content_t *content = MSG_CONTENT(msg);
qd_compose_start_list(field);
qd_compose_insert_bool(field, 0); // durable
//qd_compose_insert_null(field); // priority
//qd_compose_insert_null(field); // ttl
//qd_compose_insert_boolean(field, 0); // first-acquirer
//qd_compose_insert_uint(field, 0); // delivery-count
qd_compose_end_list(field);
qd_buffer_list_t out_ma;
if (compose_message_annotations((qd_message_pvt_t*)msg, &out_ma)) {
qd_compose_insert_buffers(field, &out_ma);
}
field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
qd_compose_start_list(field);
qd_compose_insert_null(field); // message-id
qd_compose_insert_null(field); // user-id
qd_compose_insert_string(field, to); // to
//qd_compose_insert_null(field); // subject
//qd_compose_insert_null(field); // reply-to
//qd_compose_insert_null(field); // correlation-id
//qd_compose_insert_null(field); // content-type
//qd_compose_insert_null(field); // content-encoding
//qd_compose_insert_timestamp(field, 0); // absolute-expiry-time
//qd_compose_insert_timestamp(field, 0); // creation-time
//qd_compose_insert_null(field); // group-id
//qd_compose_insert_uint(field, 0); // group-sequence
//qd_compose_insert_null(field); // reply-to-group-id
qd_compose_end_list(field);
if (buffers) {
field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
qd_compose_insert_binary_buffers(field, buffers);
}
qd_compose_take_buffers(field, &content->buffers);
qd_compose_free(field);
}
void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_buffer_list_t *field_buffers = qd_compose_buffers(field);
content->buffers = *field_buffers;
DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
}
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1);
qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2);
content->buffers = *field1_buffers;
DEQ_INIT(*field1_buffers);
qd_buffer_t *buf = DEQ_HEAD(*field2_buffers);
while (buf) {
DEQ_REMOVE_HEAD(*field2_buffers);
DEQ_INSERT_TAIL(content->buffers, buf);
buf = DEQ_HEAD(*field2_buffers);
}
}