blob: 3c5e9fb6f7da2556f479e003e979e2a8f5d18743 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/threading.h>
#include "message_private.h"
#include "compose_private.h"
#include <string.h>
#include <stdio.h>
static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
ALLOC_DEFINE(qd_message_content_t);
typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
static void advance(unsigned char **cursor, qd_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
{
unsigned char *local_cursor = *cursor;
qd_buffer_t *local_buffer = *buffer;
int remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer));
while (consume > 0) {
if (consume < remaining) {
if (handler)
handler(context, local_cursor, consume);
local_cursor += consume;
consume = 0;
} else {
if (handler)
handler(context, local_cursor, remaining);
consume -= remaining;
local_buffer = local_buffer->next;
if (local_buffer == 0){
local_cursor = 0;
break;
}
local_cursor = qd_buffer_base(local_buffer);
remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer));
}
}
*cursor = local_cursor;
*buffer = local_buffer;
}
static unsigned char next_octet(unsigned char **cursor, qd_buffer_t **buffer)
{
unsigned char result = **cursor;
advance(cursor, buffer, 1, 0, 0);
return result;
}
static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
{
qd_buffer_t *start_buffer = *buffer;
unsigned char *start_cursor = *cursor;
unsigned char tag = next_octet(cursor, buffer);
if (!(*cursor)) return 0;
int consume = 0;
size_t hdr_length = 1;
switch (tag & 0xF0) {
case 0x40 : consume = 0; break;
case 0x50 : consume = 1; break;
case 0x60 : consume = 2; break;
case 0x70 : consume = 4; break;
case 0x80 : consume = 8; break;
case 0x90 : consume = 16; break;
case 0xB0 :
case 0xD0 :
case 0xF0 :
hdr_length += 3;
consume |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
consume |= ((int) next_octet(cursor, buffer)) << 16;
if (!(*cursor)) return 0;
consume |= ((int) next_octet(cursor, buffer)) << 8;
if (!(*cursor)) return 0;
// Fall through to the next case...
case 0xA0 :
case 0xC0 :
case 0xE0 :
hdr_length++;
consume |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
}
if (field && !field->parsed) {
field->buffer = start_buffer;
field->offset = start_cursor - qd_buffer_base(start_buffer);
field->length = consume;
field->hdr_length = hdr_length;
field->parsed = 1;
}
advance(cursor, buffer, consume, 0, 0);
return 1;
}
static int start_list(unsigned char **cursor, qd_buffer_t **buffer)
{
unsigned char tag = next_octet(cursor, buffer);
if (!(*cursor)) return 0;
int length = 0;
int count = 0;
switch (tag) {
case 0x45 : // list0
break;
case 0xd0 : // list32
length |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
length |= ((int) next_octet(cursor, buffer)) << 16;
if (!(*cursor)) return 0;
length |= ((int) next_octet(cursor, buffer)) << 8;
if (!(*cursor)) return 0;
length |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
count |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
count |= ((int) next_octet(cursor, buffer)) << 16;
if (!(*cursor)) return 0;
count |= ((int) next_octet(cursor, buffer)) << 8;
if (!(*cursor)) return 0;
count |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
case 0xc0 : // list8
length |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
count |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
}
return count;
}
//
// Check the buffer chain, starting at cursor to see if it matches the pattern.
// If the pattern matches, check the next tag to see if it's in the set of expected
// tags. If not, return zero. If so, set the location descriptor to the good
// tag and advance the cursor (and buffer, if needed) to the end of the matched section.
//
// If there is no match, don't advance the cursor.
//
// Return 0 if the pattern matches but the following tag is unexpected
// Return 0 if the pattern matches and the location already has a pointer (duplicate section)
// Return 1 if the pattern matches and we've advanced the cursor/buffer
// Return 1 if the pattern does not match
//
static int qd_check_and_advance(qd_buffer_t **buffer,
unsigned char **cursor,
const unsigned char *pattern,
int pattern_length,
const unsigned char *expected_tags,
qd_field_location_t *location)
{
qd_buffer_t *test_buffer = *buffer;
unsigned char *test_cursor = *cursor;
if (!test_cursor)
return 1; // no match
unsigned char *end_of_buffer = qd_buffer_base(test_buffer) + qd_buffer_size(test_buffer);
int idx = 0;
while (idx < pattern_length && *test_cursor == pattern[idx]) {
idx++;
test_cursor++;
if (test_cursor == end_of_buffer) {
test_buffer = test_buffer->next;
if (test_buffer == 0)
return 1; // Pattern didn't match
test_cursor = qd_buffer_base(test_buffer);
end_of_buffer = test_cursor + qd_buffer_size(test_buffer);
}
}
if (idx < pattern_length)
return 1; // Pattern didn't match
//
// Pattern matched, check the tag
//
while (*expected_tags && *test_cursor != *expected_tags)
expected_tags++;
if (*expected_tags == 0)
return 0; // Unexpected tag
if (location->parsed)
return 0; // Duplicate section
//
// Pattern matched and tag is expected. Mark the beginning of the section.
//
location->parsed = 1;
location->buffer = *buffer;
location->offset = *cursor - qd_buffer_base(*buffer);
location->length = 0;
location->hdr_length = pattern_length;
//
// Advance the pointers to consume the whole section.
//
int pre_consume = 1; // Count the already extracted tag
int consume = 0;
unsigned char tag = next_octet(&test_cursor, &test_buffer);
if (!test_cursor) return 0;
switch (tag) {
case 0x45 : // list0
break;
case 0xd0 : // list32
case 0xd1 : // map32
case 0xb0 : // vbin32
pre_consume += 3;
consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24;
if (!test_cursor) return 0;
consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16;
if (!test_cursor) return 0;
consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8;
if (!test_cursor) return 0;
// Fall through to the next case...
case 0xc0 : // list8
case 0xc1 : // map8
case 0xa0 : // vbin8
pre_consume += 1;
consume |= (int) next_octet(&test_cursor, &test_buffer);
if (!test_cursor) return 0;
break;
}
location->length = pre_consume + consume;
if (consume)
advance(&test_cursor, &test_buffer, consume, 0, 0);
*cursor = test_cursor;
*buffer = test_buffer;
return 1;
}
static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_message_field_t field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
switch (field) {
case QD_FIELD_TO:
while (1) {
if (content->field_to.parsed)
return &content->field_to;
if (content->section_message_properties.parsed == 0)
break;
qd_buffer_t *buffer = content->section_message_properties.buffer;
unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
advance(&cursor, &buffer, content->section_message_properties.hdr_length, 0, 0);
int count = start_list(&cursor, &buffer);
int result;
if (count < 3)
break;
result = traverse_field(&cursor, &buffer, 0); // message_id
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_to); // to
if (!result) return 0;
}
break;
case QD_FIELD_REPLY_TO:
while (1) {
if (content->field_reply_to.parsed)
return &content->field_reply_to;
if (content->section_message_properties.parsed == 0)
break;
qd_buffer_t *buffer = content->section_message_properties.buffer;
unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
advance(&cursor, &buffer, content->section_message_properties.hdr_length, 0, 0);
int count = start_list(&cursor, &buffer);
int result;
if (count < 5)
break;
result = traverse_field(&cursor, &buffer, 0); // message_id
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_to); // to
if (!result) return 0;
result = traverse_field(&cursor, &buffer, 0); // subject
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_reply_to); // reply_to
if (!result) return 0;
}
break;
case QD_FIELD_CORRELATION_ID:
while (1) {
if (content->field_correlation_id.parsed)
return &content->field_correlation_id;
if (content->section_message_properties.parsed == 0)
break;
qd_buffer_t *buffer = content->section_message_properties.buffer;
unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
advance(&cursor, &buffer, content->section_message_properties.hdr_length, 0, 0);
int count = start_list(&cursor, &buffer);
int result;
if (count < 6)
break;
result = traverse_field(&cursor, &buffer, 0); // message_id
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_to); // to
if (!result) return 0;
result = traverse_field(&cursor, &buffer, 0); // subject
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_reply_to); // reply_to
if (!result) return 0;
result = traverse_field(&cursor, &buffer, &content->field_correlation_id); // correlation_id
if (!result) return 0;
}
break;
case QD_FIELD_DELIVERY_ANNOTATION:
if (content->section_delivery_annotation.parsed)
return &content->section_delivery_annotation;
break;
case QD_FIELD_APPLICATION_PROPERTIES:
if (content->section_application_properties.parsed)
return &content->section_application_properties;
break;
case QD_FIELD_BODY:
if (content->section_body.parsed)
return &content->section_body;
break;
default:
break;
}
return 0;
}
qd_message_t *qd_message()
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) new_qd_message_t();
if (!msg)
return 0;
DEQ_ITEM_INIT(msg);
msg->content = new_qd_message_content_t();
if (msg->content == 0) {
free_qd_message_t((qd_message_t*) msg);
return 0;
}
memset(msg->content, 0, sizeof(qd_message_content_t));
msg->content->lock = sys_mutex();
msg->content->ref_count = 1;
msg->content->parse_depth = QD_DEPTH_NONE;
msg->content->parsed_delivery_annotations = 0;
return (qd_message_t*) msg;
}
void qd_message_free(qd_message_t *in_msg)
{
uint32_t rc;
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
sys_mutex_lock(content->lock);
rc = --content->ref_count;
sys_mutex_unlock(content->lock);
if (rc == 0) {
if (content->parsed_delivery_annotations)
qd_parse_free(content->parsed_delivery_annotations);
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
DEQ_REMOVE_HEAD(content->buffers);
qd_buffer_free(buf);
buf = DEQ_HEAD(content->buffers);
}
buf = DEQ_HEAD(content->new_delivery_annotations);
while (buf) {
DEQ_REMOVE_HEAD(content->new_delivery_annotations);
qd_buffer_free(buf);
buf = DEQ_HEAD(content->new_delivery_annotations);
}
sys_mutex_free(content->lock);
free_qd_message_content_t(content);
}
free_qd_message_t((qd_message_t*) msg);
}
qd_message_t *qd_message_copy(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_message_pvt_t *copy = (qd_message_pvt_t*) new_qd_message_t();
if (!copy)
return 0;
DEQ_ITEM_INIT(copy);
copy->content = content;
sys_mutex_lock(content->lock);
content->ref_count++;
sys_mutex_unlock(content->lock);
return (qd_message_t*) copy;
}
qd_parsed_field_t *qd_message_delivery_annotations(qd_message_t *in_msg)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
if (content->parsed_delivery_annotations)
return content->parsed_delivery_annotations;
qd_field_iterator_t *da = qd_message_field_iterator(in_msg, QD_FIELD_DELIVERY_ANNOTATION);
if (da == 0)
return 0;
content->parsed_delivery_annotations = qd_parse(da);
if (content->parsed_delivery_annotations == 0 ||
!qd_parse_ok(content->parsed_delivery_annotations) ||
!qd_parse_is_map(content->parsed_delivery_annotations)) {
qd_field_iterator_free(da);
qd_parse_free(content->parsed_delivery_annotations);
content->parsed_delivery_annotations = 0;
return 0;
}
qd_field_iterator_free(da);
return content->parsed_delivery_annotations;
}
void qd_message_set_delivery_annotations(qd_message_t *msg, qd_composed_field_t *da)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_buffer_list_t *field_buffers = qd_compose_buffers(da);
assert(DEQ_SIZE(content->new_delivery_annotations) == 0);
content->new_delivery_annotations = *field_buffers;
DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
}
qd_message_t *qd_message_receive(qd_delivery_t *delivery)
{
pn_delivery_t *pnd = qd_delivery_pn(delivery);
qd_message_pvt_t *msg = (qd_message_pvt_t*) qd_delivery_context(delivery);
pn_link_t *link = pn_delivery_link(pnd);
ssize_t rc;
qd_buffer_t *buf;
//
// If there is no message associated with the delivery, this is the first time
// we've received anything on this delivery. Allocate a message descriptor and
// link it and the delivery together.
//
if (!msg) {
msg = (qd_message_pvt_t*) qd_message();
qd_delivery_set_context(delivery, (void*) msg);
}
//
// Get a reference to the tail buffer on the message. This is the buffer into which
// we will store incoming message data. If there is no buffer in the message, allocate
// an empty one and add it to the message.
//
buf = DEQ_TAIL(msg->content->buffers);
if (!buf) {
buf = qd_buffer();
DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
while (1) {
//
// Try to receive enough data to fill the remaining space in the tail buffer.
//
rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf));
//
// If we receive PN_EOS, we have come to the end of the message.
//
if (rc == PN_EOS) {
//
// If the last buffer in the list is empty, remove it and free it. This
// will only happen if the size of the message content is an exact multiple
// of the buffer size.
//
if (qd_buffer_size(buf) == 0) {
DEQ_REMOVE_TAIL(msg->content->buffers);
qd_buffer_free(buf);
}
qd_delivery_set_context(delivery, 0);
return (qd_message_t*) msg;
}
if (rc > 0) {
//
// We have received a positive number of bytes for the message. Advance
// the cursor in the buffer.
//
qd_buffer_insert(buf, rc);
//
// If the buffer is full, allocate a new empty buffer and append it to the
// tail of the message's list.
//
if (qd_buffer_capacity(buf) == 0) {
buf = qd_buffer();
DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
} else
//
// We received zero bytes, and no PN_EOS. This means that we've received
// all of the data available up to this point, but it does not constitute
// the entire message. We'll be back later to finish it up.
//
break;
}
return 0;
}
static void send_handler(void *context, const unsigned char *start, int length)
{
pn_link_t *pnl = (pn_link_t*) context;
pn_link_send(pnl, (const char*) start, length);
}
void qd_message_send(qd_message_t *in_msg, qd_link_t *link)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
unsigned char *cursor;
pn_link_t *pnl = qd_link_pn(link);
if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
//
// This is the case where the delivery annotations have been modified.
// The message send must be divided into sections: The existing header;
// the new delivery annotations; the rest of the existing message.
// Note that the original delivery annotations that are still in the
// buffer chain must not be sent.
//
// Start by making sure that we've parsed the message sections through
// the delivery annotations
//
if (!qd_message_check(in_msg, QD_DEPTH_DELIVERY_ANNOTATIONS))
return;
//
// Send header if present
//
cursor = qd_buffer_base(buf);
if (content->section_message_header.length > 0) {
buf = content->section_message_header.buffer;
cursor = content->section_message_header.offset + qd_buffer_base(buf);
advance(&cursor, &buf,
content->section_message_header.length + content->section_message_header.hdr_length,
send_handler, (void*) pnl);
}
//
// Send new delivery annotations
//
qd_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations);
while (da_buf) {
pn_link_send(pnl, (char*) qd_buffer_base(da_buf), qd_buffer_size(da_buf));
da_buf = DEQ_NEXT(da_buf);
}
//
// Skip over replaced delivery annotations
//
if (content->section_delivery_annotation.length > 0)
advance(&cursor, &buf,
content->section_delivery_annotation.hdr_length + content->section_delivery_annotation.length,
0, 0);
//
// Send remaining partial buffer
//
if (buf) {
size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
advance(&cursor, &buf, len, send_handler, (void*) pnl);
}
// Fall through to process the remaining buffers normally
}
while (buf) {
pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}
static int qd_check_field_LH(qd_message_content_t *content,
qd_message_depth_t depth,
const unsigned char *long_pattern,
const unsigned char *short_pattern,
const unsigned char *expected_tags,
qd_field_location_t *location,
int more)
{
#define LONG 10
#define SHORT 3
if (depth > content->parse_depth) {
if (0 == qd_check_and_advance(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location))
return 0;
if (0 == qd_check_and_advance(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location))
return 0;
if (!more)
content->parse_depth = depth;
}
return 1;
}
static int qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth)
{
qd_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer)
return 0; // Invalid - No data in the message
if (depth <= content->parse_depth)
return 1; // We've already parsed at least this deep
if (content->parse_buffer == 0) {
content->parse_buffer = buffer;
content->parse_cursor = qd_buffer_base(content->parse_buffer);
}
if (depth == QD_DEPTH_NONE)
return 1;
//
// MESSAGE HEADER
//
if (0 == qd_check_field_LH(content, QD_DEPTH_HEADER,
MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, &content->section_message_header, 0))
return 0;
if (depth == QD_DEPTH_HEADER)
return 1;
//
// DELIVERY ANNOTATION
//
if (0 == qd_check_field_LH(content, QD_DEPTH_DELIVERY_ANNOTATIONS,
DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, &content->section_delivery_annotation, 0))
return 0;
if (depth == QD_DEPTH_DELIVERY_ANNOTATIONS)
return 1;
//
// MESSAGE ANNOTATION
//
if (0 == qd_check_field_LH(content, QD_DEPTH_MESSAGE_ANNOTATIONS,
MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, &content->section_message_annotation, 0))
return 0;
if (depth == QD_DEPTH_MESSAGE_ANNOTATIONS)
return 1;
//
// PROPERTIES
//
if (0 == qd_check_field_LH(content, QD_DEPTH_PROPERTIES,
PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, &content->section_message_properties, 0))
return 0;
if (depth == QD_DEPTH_PROPERTIES)
return 1;
//
// APPLICATION PROPERTIES
//
if (0 == qd_check_field_LH(content, QD_DEPTH_APPLICATION_PROPERTIES,
APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, &content->section_application_properties, 0))
return 0;
if (depth == QD_DEPTH_APPLICATION_PROPERTIES)
return 1;
//
// BODY
// Note that this function expects a limited set of types in a VALUE section. This is
// not a problem for messages passing through Dispatch because through-only messages won't
// be parsed to BODY-depth.
//
if (0 == qd_check_field_LH(content, QD_DEPTH_BODY,
BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, &content->section_body, 1))
return 0;
if (0 == qd_check_field_LH(content, QD_DEPTH_BODY,
BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, &content->section_body, 1))
return 0;
if (0 == qd_check_field_LH(content, QD_DEPTH_BODY,
BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, &content->section_body, 0))
return 0;
if (depth == QD_DEPTH_BODY)
return 1;
//
// FOOTER
//
if (0 == qd_check_field_LH(content, QD_DEPTH_ALL,
FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, &content->section_footer, 0))
return 0;
return 1;
}
int qd_message_check(qd_message_t *in_msg, qd_message_depth_t depth)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
int result;
sys_mutex_lock(content->lock);
result = qd_message_check_LH(content, depth);
sys_mutex_unlock(content->lock);
return result;
}
qd_field_iterator_t *qd_message_field_iterator_typed(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return 0;
return qd_field_iterator_buffer(loc->buffer, loc->offset, loc->length + loc->hdr_length, ITER_VIEW_ALL);
}
qd_field_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return 0;
qd_buffer_t *buffer = loc->buffer;
unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset;
advance(&cursor, &buffer, loc->hdr_length, 0, 0);
return qd_field_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length, ITER_VIEW_ALL);
}
ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return -1;
return loc->length;
}
ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, void *buffer, size_t *hdr_length)
{
qd_field_location_t *loc = qd_message_field_location(msg, field);
if (!loc)
return -1;
qd_buffer_t *buf = loc->buffer;
size_t bufsize = qd_buffer_size(buf) - loc->offset;
void *base = qd_buffer_base(buf) + loc->offset;
size_t remaining = loc->length + loc->hdr_length;
*hdr_length = loc->hdr_length;
while (remaining > 0) {
if (bufsize > remaining)
bufsize = remaining;
memcpy(buffer, base, bufsize);
buffer += bufsize;
remaining -= bufsize;
if (remaining > 0) {
buf = buf->next;
base = qd_buffer_base(buf);
bufsize = qd_buffer_size(buf);
}
}
return loc->length + loc->hdr_length;
}
void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers)
{
qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
qd_message_content_t *content = MSG_CONTENT(msg);
qd_compose_start_list(field);
qd_compose_insert_bool(field, 0); // durable
//qd_compose_insert_null(field); // priority
//qd_compose_insert_null(field); // ttl
//qd_compose_insert_boolean(field, 0); // first-acquirer
//qd_compose_insert_uint(field, 0); // delivery-count
qd_compose_end_list(field);
field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
qd_compose_start_list(field);
qd_compose_insert_null(field); // message-id
qd_compose_insert_null(field); // user-id
qd_compose_insert_string(field, to); // to
//qd_compose_insert_null(field); // subject
//qd_compose_insert_null(field); // reply-to
//qd_compose_insert_null(field); // correlation-id
//qd_compose_insert_null(field); // content-type
//qd_compose_insert_null(field); // content-encoding
//qd_compose_insert_timestamp(field, 0); // absolute-expiry-time
//qd_compose_insert_timestamp(field, 0); // creation-time
//qd_compose_insert_null(field); // group-id
//qd_compose_insert_uint(field, 0); // group-sequence
//qd_compose_insert_null(field); // reply-to-group-id
qd_compose_end_list(field);
if (buffers) {
field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
qd_compose_insert_binary_buffers(field, buffers);
}
qd_buffer_list_t *field_buffers = qd_compose_buffers(field);
content->buffers = *field_buffers;
DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
qd_compose_free(field);
}
void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_buffer_list_t *field_buffers = qd_compose_buffers(field);
content->buffers = *field_buffers;
DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
}