blob: ee52fd22cb0caf10c6184d6038cc79a74c6b5f39 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/message.h>
#include <proton/buffer.h>
#include <proton/codec.h>
#include <proton/error.h>
#include <proton/parser.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include "protocol.h"
#include "../util.h"
ssize_t pn_message_data(char *dst, size_t available, const char *src, size_t size)
{
pn_bytes_t bytes = pn_bytes(available, dst);
pn_atom_t buf[16];
pn_atoms_t atoms = {16, buf};
int err = pn_fill_atoms(&atoms, "DLz", 0x75, size, src);
if (err) return err;
err = pn_encode_atoms(&bytes, &atoms);
if (err) return err;
return bytes.size;
}
// message
struct pn_message_t {
bool durable;
uint8_t priority;
pn_millis_t ttl;
bool first_acquirer;
uint32_t delivery_count;
pn_atom_t id;
pn_buffer_t *user_id;
pn_buffer_t *address;
pn_buffer_t *subject;
pn_buffer_t *reply_to;
pn_atom_t correlation_id;
pn_buffer_t *content_type;
pn_buffer_t *content_encoding;
pn_timestamp_t expiry_time;
pn_timestamp_t creation_time;
pn_buffer_t *group_id;
pn_sequence_t group_sequence;
pn_buffer_t *reply_to_group_id;
pn_data_t *data;
pn_data_t *body;
pn_format_t format;
pn_parser_t *parser;
};
pn_message_t *pn_message()
{
pn_message_t *msg = malloc(sizeof(pn_message_t));
msg->durable = false;
msg->priority = PN_DEFAULT_PRIORITY;
msg->ttl = 0;
msg->first_acquirer = false;
msg->delivery_count = 0;
msg->id.type = PN_NULL;
msg->user_id = NULL;
msg->address = NULL;
msg->subject = NULL;
msg->reply_to = NULL;
msg->correlation_id.type = PN_NULL;
msg->content_type = NULL;
msg->content_encoding = NULL;
msg->expiry_time = 0;
msg->creation_time = 0;
msg->group_id = NULL;
msg->group_sequence = 0;
msg->reply_to_group_id = NULL;
msg->data = NULL;
msg->body = NULL;
msg->format = PN_DATA;
msg->parser = NULL;
return msg;
}
void pn_message_free(pn_message_t *msg)
{
if (msg) {
pn_buffer_free(msg->user_id);
pn_buffer_free(msg->address);
pn_buffer_free(msg->subject);
pn_buffer_free(msg->reply_to);
pn_buffer_free(msg->content_type);
pn_buffer_free(msg->content_encoding);
pn_buffer_free(msg->group_id);
pn_buffer_free(msg->reply_to_group_id);
pn_data_free(msg->data);
pn_data_free(msg->body);
pn_parser_free(msg->parser);
free(msg);
}
}
void pn_message_clear(pn_message_t *msg)
{
msg->durable = false;
msg->priority = PN_DEFAULT_PRIORITY;
msg->ttl = 0;
msg->first_acquirer = false;
msg->delivery_count = 0;
msg->id.type = PN_NULL;
if (msg->user_id) pn_buffer_clear(msg->user_id);
if (msg->address) pn_buffer_clear(msg->address);
if (msg->subject) pn_buffer_clear(msg->subject);
if (msg->reply_to) pn_buffer_clear(msg->reply_to);
msg->correlation_id.type = PN_NULL;
if (msg->content_type) pn_buffer_clear(msg->content_type);
if (msg->content_encoding) pn_buffer_clear(msg->content_encoding);
msg->expiry_time = 0;
msg->creation_time = 0;
if (msg->group_id) pn_buffer_clear(msg->group_id);
msg->group_sequence = 0;
if (msg->reply_to_group_id) pn_buffer_clear(msg->reply_to_group_id);
if (msg->data) pn_data_clear(msg->data);
if (msg->body) pn_data_clear(msg->body);
}
int pn_message_errno(pn_message_t *msg)
{
if (msg && msg->parser) {
return pn_parser_errno(msg->parser);
} else {
return 0;
}
}
const char *pn_message_error(pn_message_t *msg)
{
if (msg && msg->parser) {
return pn_parser_error(msg->parser);
} else {
return NULL;
}
}
pn_parser_t *pn_message_parser(pn_message_t *msg)
{
if (!msg->parser) {
msg->parser = pn_parser();
}
return msg->parser;
}
bool pn_message_is_durable(pn_message_t *msg)
{
return msg ? msg->durable : false;
}
int pn_message_set_durable(pn_message_t *msg, bool durable)
{
if (!msg) return PN_ARG_ERR;
msg->durable = durable;
return 0;
}
uint8_t pn_message_get_priority(pn_message_t *msg)
{
return msg ? msg->priority : PN_DEFAULT_PRIORITY;
}
int pn_message_set_priority(pn_message_t *msg, uint8_t priority)
{
if (!msg) return PN_ARG_ERR;
msg->priority = priority;
return 0;
}
pn_millis_t pn_message_get_ttl(pn_message_t *msg)
{
return msg ? msg->ttl : 0;
}
int pn_message_set_ttl(pn_message_t *msg, pn_millis_t ttl)
{
if (!msg) return PN_ARG_ERR;
msg->ttl = ttl;
return 0;
}
bool pn_message_is_first_acquirer(pn_message_t *msg)
{
return msg ? msg->first_acquirer : false;
}
int pn_message_set_first_acquirer(pn_message_t *msg, bool first)
{
if (!msg) return PN_ARG_ERR;
msg->first_acquirer = first;
return 0;
}
uint32_t pn_message_get_delivery_count(pn_message_t *msg)
{
return msg ? msg->delivery_count : 0;
}
int pn_message_set_delivery_count(pn_message_t *msg, uint32_t count)
{
if (!msg) return PN_ARG_ERR;
msg->delivery_count = count;
return 0;
}
pn_atom_t pn_message_get_id(pn_message_t *msg)
{
return msg ? msg->id : (pn_atom_t) {.type=PN_NULL};
}
int pn_message_set_id(pn_message_t *msg, pn_atom_t id)
{
if (!msg) return PN_ARG_ERR;
msg->id = id;
return 0;
}
static int pn_buffer_set_bytes(pn_buffer_t **buf, pn_bytes_t bytes)
{
if (!*buf) {
*buf = pn_buffer(64);
}
int err = pn_buffer_clear(*buf);
if (err) return err;
return pn_buffer_append(*buf, bytes.start, bytes.size);
}
static const char *pn_buffer_str(pn_buffer_t *buf)
{
if (buf) {
pn_bytes_t bytes = pn_buffer_bytes(buf);
if (bytes.size) {
return bytes.start;
}
}
return NULL;
}
static int pn_buffer_set_strn(pn_buffer_t **buf, const char *str, size_t size)
{
if (!*buf) {
*buf = pn_buffer(64);
}
int err = pn_buffer_clear(*buf);
if (err) return err;
err = pn_buffer_append(*buf, str, size);
if (err) return err;
if (str && str[size-1]) {
return pn_buffer_append(*buf, "\0", 1);
} else {
return 0;
}
}
static int pn_buffer_set_str(pn_buffer_t **buf, const char *str)
{
size_t size = str ? strlen(str) + 1 : 0;
return pn_buffer_set_strn(buf, str, size);
}
pn_bytes_t pn_message_get_user_id(pn_message_t *msg)
{
return msg && msg->user_id ? pn_buffer_bytes(msg->user_id) : pn_bytes(0, NULL);
}
int pn_message_set_user_id(pn_message_t *msg, pn_bytes_t user_id)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_bytes(&msg->user_id, user_id);
}
const char *pn_message_get_address(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->address) : NULL;
}
int pn_message_set_address(pn_message_t *msg, const char *address)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->address, address);
}
const char *pn_message_get_subject(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->subject) : NULL;
}
int pn_message_set_subject(pn_message_t *msg, const char *subject)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->subject, subject);
}
const char *pn_message_get_reply_to(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->reply_to) : NULL;
}
int pn_message_set_reply_to(pn_message_t *msg, const char *reply_to)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->reply_to, reply_to);
}
pn_atom_t pn_message_get_correlation_id(pn_message_t *msg)
{
return msg ? msg->correlation_id : (pn_atom_t) {.type=PN_NULL};
}
int pn_message_set_correlation_id(pn_message_t *msg, pn_atom_t atom)
{
if (!msg) return PN_ARG_ERR;
msg->correlation_id = atom;
return 0;
}
const char *pn_message_get_content_type(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->content_type) : NULL;
}
int pn_message_set_content_type(pn_message_t *msg, const char *type)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->content_type, type);
}
const char *pn_message_get_content_encoding(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->content_encoding) : NULL;
}
int pn_message_set_content_encoding(pn_message_t *msg, const char *encoding)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->content_encoding, encoding);
}
pn_timestamp_t pn_message_get_expiry_time(pn_message_t *msg)
{
return msg ? msg->expiry_time : 0;
}
int pn_message_set_expiry_time(pn_message_t *msg, pn_timestamp_t time)
{
if (!msg) return PN_ARG_ERR;
msg->expiry_time = time;
return 0;
}
pn_timestamp_t pn_message_get_creation_time(pn_message_t *msg)
{
return msg ? msg->creation_time : 0;
}
int pn_message_set_creation_time(pn_message_t *msg, pn_timestamp_t time)
{
if (!msg) return PN_ARG_ERR;
msg->creation_time = time;
return 0;
}
const char *pn_message_get_group_id(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->group_id) : NULL;
}
int pn_message_set_group_id(pn_message_t *msg, const char *group_id)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->group_id, group_id);
}
pn_sequence_t pn_message_get_group_sequence(pn_message_t *msg)
{
return msg ? msg->group_sequence : 0;
}
int pn_message_set_group_sequence(pn_message_t *msg, pn_sequence_t n)
{
if (!msg) return PN_ARG_ERR;
msg->group_sequence = n;
return 0;
}
const char *pn_message_get_reply_to_group_id(pn_message_t *msg)
{
return msg ? pn_buffer_str(msg->reply_to_group_id) : NULL;
}
int pn_message_set_reply_to_group_id(pn_message_t *msg, const char *reply_to_group_id)
{
if (!msg) return PN_ARG_ERR;
return pn_buffer_set_str(&msg->reply_to_group_id, reply_to_group_id);
}
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
{
if (!msg || !bytes || !size) return PN_ARG_ERR;
if (!msg->data) {
msg->data = pn_data(64);
}
if (!msg->body) {
msg->body = pn_data(64);
}
pn_data_clear(msg->body);
while (size) {
size_t copy = size;
pn_data_clear(msg->data);
int err = pn_data_decode(msg->data, (char *) bytes, &copy);
if (err) return err;
size -= copy;
bytes += copy;
bool scanned;
uint64_t desc;
err = pn_data_scan(msg->data, "D?L.", &scanned, &desc);
if (err) return err;
if (!scanned){
desc = 0;
}
switch (desc) {
case HEADER:
pn_data_scan(msg->data, "D.[oBIoI]", &msg->durable, &msg->priority,
&msg->ttl, &msg->first_acquirer, &msg->delivery_count);
break;
case PROPERTIES:
{
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSIS]", &user_id, &address,
&subject, &reply_to, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
if (err) return err;
err = pn_buffer_set_bytes(&msg->user_id, user_id);
if (err) return err;
err = pn_buffer_set_strn(&msg->address, address.start, address.size);
if (err) return err;
err = pn_buffer_set_strn(&msg->subject, subject.start, subject.size);
if (err) return err;
err = pn_buffer_set_strn(&msg->reply_to, reply_to.start, reply_to.size);
if (err) return err;
err = pn_buffer_set_strn(&msg->content_type, ctype.start, ctype.size);
if (err) return err;
err = pn_buffer_set_strn(&msg->content_encoding, cencoding.start,
cencoding.size);
if (err) return err;
err = pn_buffer_set_strn(&msg->group_id, group_id.start, group_id.size);
if (err) return err;
err = pn_buffer_set_strn(&msg->reply_to_group_id, reply_to_group_id.start,
reply_to_group_id.size);
if (err) return err;
}
break;
case DELIVERY_ANNOTATIONS:
case MESSAGE_ANNOTATIONS:
break;
default:
{
pn_data_t *data = msg->body;
msg->body = msg->data;
msg->data = data;
err = pn_data_intern(msg->body);
if (err) return err;
}
break;
}
}
return pn_data_clear(msg->data);
}
int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size)
{
if (!msg || !bytes || !size || !*size) return PN_ARG_ERR;
if (!msg->data) {
msg->data = pn_data(64);
}
if (!msg->body) {
msg->body = pn_data(64);
}
int err = pn_data_clear(msg->data);
if (err) return err;
err = pn_data_fill(msg->data, "DL[oBIoI]", HEADER, msg->durable,
msg->priority, msg->ttl, msg->first_acquirer,
msg->delivery_count);
if (err) return err;
err = pn_data_fill(msg->data, "DL[nzSSSnssLLSiS]", PROPERTIES,
pn_buffer_bytes(msg->user_id),
pn_buffer_str(msg->address),
pn_buffer_str(msg->subject),
pn_buffer_str(msg->reply_to),
pn_buffer_str(msg->content_type),
pn_buffer_str(msg->content_encoding),
msg->expiry_time,
msg->creation_time,
pn_buffer_str(msg->group_id),
msg->group_sequence,
pn_buffer_str(msg->reply_to_group_id));
if (err) return err;
size_t remaining = *size;
size_t encoded = remaining;
err = pn_data_encode(msg->data, bytes, &encoded);
if (err) return err;
bytes += encoded;
remaining -= encoded;
encoded = remaining;
err = pn_data_encode(msg->body, bytes, &encoded);
if (err) return err;
bytes += encoded;
remaining -= encoded;
*size -= remaining;
return 0;
}
pn_format_t pn_message_get_format(pn_message_t *msg)
{
return msg ? msg->format : PN_AMQP;
}
int pn_message_set_format(pn_message_t *msg, pn_format_t format)
{
if (!msg) return PN_ARG_ERR;
msg->format = format;
return 0;
}
int pn_message_load(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
switch (msg->format) {
case PN_DATA: return pn_message_load_data(msg, data, size);
case PN_TEXT: return pn_message_load_text(msg, data, size);
case PN_AMQP: return pn_message_load_amqp(msg, data, size);
case PN_JSON: return pn_message_load_json(msg, data, size);
}
return PN_STATE_ERR;
}
int pn_message_load_data(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
if (!msg->body) {
msg->body = pn_data(64);
}
int err = pn_data_fill(msg->body, "DLz", DATA, size, data);
if (err) return err;
return pn_data_intern(msg->body);
}
int pn_message_load_text(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
if (!msg->body) {
msg->body = pn_data(64);
}
int err = pn_data_fill(msg->body, "DLS", AMQP_VALUE, data);
if (err) return err;
return pn_data_intern(msg->body);
}
int pn_message_load_amqp(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
if (!msg->body) {
msg->body = pn_data(64);
}
pn_parser_t *parser = pn_message_parser(msg);
while (true) {
pn_data_clear(msg->body);
pn_atoms_t atoms = pn_data_available(msg->body);
int err = pn_parser_parse(parser, data, &atoms);
if (err == PN_OVERFLOW) {
err = pn_data_grow(msg->body);
if (err) return err;
continue;
} else if (err) {
return err;
} else {
return pn_data_resize(msg->body, atoms.size);
}
}
}
int pn_message_load_json(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
// XXX: unsupported format
return PN_ERR;
}
int pn_message_save(pn_message_t *msg, char *data, size_t *size)
{
if (!msg) return PN_ARG_ERR;
switch (msg->format) {
case PN_DATA: return pn_message_save_data(msg, data, size);
case PN_TEXT: return pn_message_save_text(msg, data, size);
case PN_AMQP: return pn_message_save_amqp(msg, data, size);
case PN_JSON: return pn_message_save_json(msg, data, size);
}
return PN_STATE_ERR;
}
int pn_message_save_data(pn_message_t *msg, char *data, size_t *size)
{
if (!msg) return PN_ARG_ERR;
if (!msg->body || pn_data_size(msg->body) == 0) {
*size = 0;
return 0;
}
uint64_t desc;
pn_bytes_t bytes;
bool scanned;
int err = pn_data_scan(msg->body, "DL?z", &desc, &scanned, &bytes);
if (err) return err;
if (desc == DATA && scanned) {
if (bytes.size > *size) {
return PN_OVERFLOW;
} else {
memcpy(data, bytes.start, bytes.size);
*size = bytes.size;
return 0;
}
} else {
return PN_STATE_ERR;
}
}
int pn_message_save_text(pn_message_t *msg, char *data, size_t *size)
{
if (!msg) return PN_ARG_ERR;
if (!msg->body) {
*size = 0;
return 0;
}
uint64_t desc;
pn_bytes_t str = {0,0};
bool scanned, dscanned;
int err = pn_data_scan(msg->body, "?DL?S", &dscanned, &desc, &scanned, &str);
if (err) return err;
if (dscanned && desc == AMQP_VALUE) {
if (scanned && str.size >= *size) {
return PN_OVERFLOW;
} else {
memcpy(data, str.start, str.size);
data[str.size] = '\0';
*size = str.size;
return 0;
}
} else {
return PN_STATE_ERR;
}
}
int pn_message_save_amqp(pn_message_t *msg, char *data, size_t *size)
{
if (!msg) return PN_ARG_ERR;
if (!msg->body) {
*size = 0;
return 0;
}
return pn_data_format(msg->body, data, size);
}
int pn_message_save_json(pn_message_t *msg, char *data, size_t *size)
{
if (!msg) return PN_ARG_ERR;
// XXX: unsupported format
return PN_ERR;
}