blob: d39342f46726229aa7fb03ad59595dc0fa175638 [file] [log] [blame]
#ifndef __dispatch_message_h__
#define __dispatch_message_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qpid/dispatch/buffer.h"
#include "qpid/dispatch/compose.h"
#include "qpid/dispatch/container.h"
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/iterator.h"
#include "qpid/dispatch/log.h"
#include "qpid/dispatch/parse.h"
#include <proton/raw_connection.h>
/**@file
* Message representation.
*
* @defgroup message message
*
* Message representation.
* @{
*/
// DISPATCH-807 Queue depth limits
// upper and lower limits for bang bang hysteresis control
//
// Q2 defines the maximum number of buffers allowed in a message's buffer
// chain. This limits the number of bytes that will be read from an incoming
// link (pn_link_recv) for the current message. Once Q2 is enabled no further
// pn_link_recv calls will be done on the link. Q2 remains in effect until enough
// bytes have been consumed by the outgoing link(s) to drop the number of
// buffered bytes below the lower threshold.
#define QD_QLIMIT_Q2_UPPER 256 // disable pn_link_recv (qd_buffer_t's)
#define QD_QLIMIT_Q2_LOWER 128 // re-enable pn_link_recv
//
// Q3 limits the number of bytes allowed to be buffered in a session's outgoing
// buffer. Once the Q3 upper limit is hit (read via pn_session_outgoing_bytes),
// pn_link_send will no longer be called for ALL outgoing links sharing the
// session. When enough outgoing bytes have been drained below the lower limit
// pn_link_sends will resume.
#define QD_QLIMIT_Q3_UPPER (QD_QLIMIT_Q3_LOWER * 2) // in pn_buffer_t's
#define QD_QLIMIT_Q3_LOWER (QD_QLIMIT_Q2_UPPER * 2) // 2 == a guess
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
typedef struct qd_message_t qd_message_t;
typedef struct qd_message_stream_data_t qd_message_stream_data_t;
/** Amount of message to be parsed. */
typedef enum {
QD_DEPTH_NONE,
QD_DEPTH_HEADER,
QD_DEPTH_DELIVERY_ANNOTATIONS,
QD_DEPTH_MESSAGE_ANNOTATIONS,
QD_DEPTH_PROPERTIES,
QD_DEPTH_APPLICATION_PROPERTIES,
QD_DEPTH_BODY,
QD_DEPTH_ALL
} qd_message_depth_t;
/** Message fields */
typedef enum {
QD_FIELD_NONE, // reserved
//
// Message Sections
//
QD_FIELD_HEADER,
QD_FIELD_DELIVERY_ANNOTATION,
QD_FIELD_MESSAGE_ANNOTATION,
QD_FIELD_PROPERTIES,
QD_FIELD_APPLICATION_PROPERTIES,
QD_FIELD_BODY,
QD_FIELD_FOOTER,
//
// Fields of the Header Section
// Ordered by list position
//
QD_FIELD_DURABLE,
QD_FIELD_PRIORITY,
QD_FIELD_TTL,
QD_FIELD_FIRST_ACQUIRER,
QD_FIELD_DELIVERY_COUNT,
//
// Fields of the Properties Section
// Ordered by list position
//
QD_FIELD_MESSAGE_ID,
QD_FIELD_USER_ID,
QD_FIELD_TO,
QD_FIELD_SUBJECT,
QD_FIELD_REPLY_TO,
QD_FIELD_CORRELATION_ID,
QD_FIELD_CONTENT_TYPE,
QD_FIELD_CONTENT_ENCODING,
QD_FIELD_ABSOLUTE_EXPIRY_TIME,
QD_FIELD_CREATION_TIME,
QD_FIELD_GROUP_ID,
QD_FIELD_GROUP_SEQUENCE,
QD_FIELD_REPLY_TO_GROUP_ID
} qd_message_field_t;
/**
* Allocate a new message.
*
* @return A pointer to a qd_message_t that is the sole reference to a newly allocated
* message.
*/
qd_message_t *qd_message(void);
/**
* Free a message reference. If this is the last reference to the message, free the
* message as well.
*
* @param msg A pointer to a qd_message_t that is no longer needed.
*/
void qd_message_free(qd_message_t *msg);
/**
* Make a new reference to an existing message.
*
* @param msg A pointer to a qd_message_t referencing a message.
* @return A new pointer to the same referenced message.
*/
qd_message_t *qd_message_copy(qd_message_t *msg);
/**
* Retrieve the message annotations from a message and place them in message storage.
*
* @param msg Pointer to a received message.
* @return 0 on success, else an error message
*/
const char *qd_message_parse_annotations(qd_message_t *msg);
/**
* Set the value for the QD_MA_TO field in the outgoing message annotations for
* the message.
*
* @param msg Pointer to an outgoing message.
* @param to_field Pointer to a c string holding the to override address that
* will be used as the value for the outgoing QD_MA_TO annotations map entry.
* If null, the message will not have a QA_MA_TO message annotation field.
*/
void qd_message_set_to_override_annotation(qd_message_t *msg, const char *to_field);
/**
* Set a phase for the phase annotation in the message.
*
* @param msg Pointer to an outgoing message.
* @param phase The phase of the address for the outgoing message.
*
*/
void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
int qd_message_get_phase_annotation(const qd_message_t *msg);
/**
* Classify the message as streaming.
*
* Marking a message as streaming will prevent downstream routers from manually
* determining if this message should be sent on an inter-router streaming
* link. Once a message is classified as streaming it retains the
* classification until it is delivered to an endpoint
*
* @param msg Pointer to an outgoing message.
*
*/
void qd_message_set_streaming_annotation(qd_message_t *msg);
/**
* Test whether received message should be considered to be streaming.
*
* @param msg Pointer to an outgoing message.
* @return true if the received message has the streaming annotation set, else false.
*
*/
int qd_message_is_streaming(const qd_message_t *msg);
/**
* Prevent the router from doing any transformations to the message annotations
* section of the message.
*
* Used by link-routing to completely skip all MA handling, including parsing
* MA on receive and restoring/composing MA on send.
*/
void qd_message_disable_router_annotations(qd_message_t *in_msg);
/**
* Receive message data frame by frame via a delivery. This function may be called more than once on the same
* delivery if the message spans multiple frames. Always returns a message. The message buffers are filled up to the point with the data that was been received so far.
* The buffer keeps filling up on successive calls to this function.
*
* @param delivery An incoming delivery from a link
* @return A pointer to the complete message or 0 if the message is not yet complete.
*/
qd_message_t *qd_message_receive(pn_delivery_t *delivery);
/**
* Returns the PN_DELIVERY_CTX record from the attachments
*
* @param delivery An incoming delivery from a link
* @return - pointer to qd_message_t object
*/
qd_message_t * qd_get_message_context(pn_delivery_t *delivery);
/**
* Returns true if there is at least one non-empty buffer at the head of the content->buffers list
* or if the content->pending buffer is non-empty.
*
* @param msg A pointer to a message.
*/
bool qd_message_has_data_in_content_or_pending_buffers(qd_message_t *msg);
/**
* Send the message outbound on an outgoing link.
*
* @param msg A pointer to a message to be sent.
* @param link The outgoing link on which to send the message.
* @param strip_outbound_annotations [in] annotation control flag
* @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full
*/
void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *q3_stalled);
/**
* Check that the message is well-formed up to a certain depth. Any part of the message that is
* beyond the specified depth is not checked for validity.
*
* Note: some message sections are optional - QD_MESSAGE_OK is returned if the
* optional section is not present, as that is valid.
*/
typedef enum {
QD_MESSAGE_DEPTH_INVALID, // corrupt or malformed message detected
QD_MESSAGE_DEPTH_OK, // valid up to depth, including 'depth' if not optional
QD_MESSAGE_DEPTH_INCOMPLETE // have not received up to 'depth', or partial depth
} qd_message_depth_status_t;
qd_message_depth_status_t qd_message_check_depth(const qd_message_t *msg, qd_message_depth_t depth);
/**
* Return an iterator for the requested message field. If the field is not in the message,
* return NULL.
*
* @param msg A pointer to a message.
* @param field The field to be returned via iterator.
* @return A field iterator that spans the requested field.
*/
qd_iterator_t *qd_message_field_iterator_typed(qd_message_t *msg, qd_message_field_t field);
qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t field);
ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field);
ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char *buffer, size_t *hdr_length);
// Create a message using composed fields to supply content.
//
// This message constructor will create a new message using each fields buffers
// concatenated in order (f1 first, f2 second, etc). There is no need to
// provide all three fields: concatenation stops at the first null fx pointer.
//
// Note well that while this constructor can support up to three separate
// composed fields it is more efficent to chain as many message sections as
// possible into as few separate composed fields as possible. This means that
// any passed composed field can contain several message sections.
//
// This constructor takes ownership of the composed fields - the caller must
// not reference them after the call.
//
qd_message_t *qd_message_compose(qd_composed_field_t *f1,
qd_composed_field_t *f2,
qd_composed_field_t *f3,
bool receive_complete);
// deprecated: use qd_message_compose() to create locally generated messages
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, bool receive_complete);
/**
* qd_message_extend
*
* Extend the content of a streaming message with more buffers.
*
* @param msg Pointer to a message
* @param field A composed field to be appended to the end of the message's stream
* @param q2_blocked Set to true if this call caused Q2 to block
* @return The number of buffers stored in the message's content
*/
int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked);
/**
* qd_message_stream_data_iterator
*
* Return an iterator that references the content (not the performative headers)
* of the entire body-data section.
*
* The returned iterator must eventually be freed by the caller.
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return Pointer to an iterator referencing the stream_data content
*/
qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_buffer_count
*
* Return the number of buffers that are needed to hold this body-data's content.
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return Number of pn_raw_buffers needed to contain the entire content of this stream_data.
*/
int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_buffers
*
* Populate an array of pn_raw_buffer_t objects with references to the stream_data's content.
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @param buffers Pointer to an array of pn_raw_buffer_t objects
* @param offset The offset (in the stream_data's buffer set) from which copying should begin
* @param count The number of pn_raw_buffer_t objects in the buffers array
* @return The number of pn_raw_buffer_t objects that were overwritten
*/
int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count);
/**
* qd_message_stream_data_payload_length
*
* Given a stream_data object, return the length of the payload.
* This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the stream_data object
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return The length of the payload of the passed in body data object.
*/
size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_release
*
* Release buffers that were associated with a body-data section. It is not required that body-data
* objects be released in the same order in which they were offered.
*
* Once this function is called, the caller must drop its reference to the stream_data object
* and not use it again.
*
* @param stream_data Pointer to a body data object returned by qd_message_next_stream_data
*/
void qd_message_stream_data_release(qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_release_up_to
*
* Release this stream data and all the previous ones also.
*
* @param stream_data Pointer to a body data object returned by qd_message_next_stream_data
*/
void qd_message_stream_data_release_up_to(qd_message_stream_data_t *stream_data);
typedef enum {
QD_MESSAGE_STREAM_DATA_BODY_OK, // A valid body data object has been returned
QD_MESSAGE_STREAM_DATA_FOOTER_OK, // A valid footer has been returned
QD_MESSAGE_STREAM_DATA_INCOMPLETE, // The next body data is incomplete, try again later
QD_MESSAGE_STREAM_DATA_NO_MORE, // There are no more body data objects in this stream
QD_MESSAGE_STREAM_DATA_INVALID // The next body data is invalid, the stream is corrupted
} qd_message_stream_data_result_t;
/**
* qd_message_next_stream_data
*
* Get the next body-data section from this streaming message return the result and
* possibly the valid, completed stream_data object.
*
* @param msg Pointer to a message
* @param stream_data Output pointer to a stream_data object (or 0 if not OK)
* @return The stream_data_result describing the result of this operation
*/
qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, qd_message_stream_data_t **stream_data);
/**
* qd_message_stream_data_footer_append
*
* Constructs a footer field by calling the qd_compose(QD_PERFORMATIVE_FOOTER, field);
* It then inserts the passed in buffer list to the composed field and proceeds to disable q2 before finally adding the footer
* field to the message.
*
* Use this function if you have the complete footer data available in the passed in buffer list
*/
int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props);
/**
* qd_message_stream_data_append
*
* Append the buffers in data as a sequence of one or more BODY_DATA sections
* to the given message. The buffers in data are moved into the message
* content by this function.
*
* @param msg Pointer to message under construction
* @param data List of buffers containing body data.
* @param qd_blocked Set to true if this call caused Q2 to block
* @return The number of buffers stored in the message's content
*/
int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked);
/** Put string representation of a message suitable for logging in buffer.
* @return buffer
*/
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits log_message);
/** Recommended buffer length for qd_message_repr */
int qd_message_repr_len();
qd_log_source_t* qd_message_log_source();
/**
* Accessor for incoming messages ingress router annotation
*
* @param msg A pointer to the message
* @return the parsed field or 0 if no ingress present in msg
*/
qd_parsed_field_t *qd_message_get_ingress_router(qd_message_t *msg);
/**
* Discard received ingress router annotation. This will cause the forwarded
* message to use the local router ID for the ingress router annotation.
*
* Used by exchange-bindings feature.
*
* @param msg A pointer to the message
*/
void qd_message_reset_ingress_router_annotation(qd_message_t *msg);
/**
* Do not include a ingress router annotation in the forwarded message.
*
* Used by the edge-router to filter out the ingress router annotation.
*
* @param msg A pointer to the message
*/
void qd_message_disable_ingress_router_annotation(qd_message_t *msg);
/**
* Accessor for message field to_override
*
* @param msg A pointer to the message
* @return the parsed field or 0 if no to_override present
*/
qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
/**
* Accessor for incoming messages trace annotation
*
* @param msg A pointer to the received message
* @return the parsed field
*/
qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg);
/**
* Discard received trace annotations. This will cause the forwarded message
* to re-start the trace list with the current router ID as the only entry.
*
* Used by exchange-bindings feature.
*
* @param msg A pointer to the message
*/
void qd_message_reset_trace_annotation(qd_message_t *msg);
/**
* Do not include a trace list annotation in the forwarded message.
*
* Used by the edge-router to filter out the trace list.
*
* @param msg A pointer to the message
*/
void qd_message_disable_trace_annotation(qd_message_t *msg);
/**
* Should the message be discarded.
* A message can be discarded if the disposition is released or rejected.
*
* @param msg A pointer to the message.
**/
bool qd_message_is_discard(qd_message_t *msg);
/**
*Set the discard field on the message to to the passed in boolean value.
*
* @param msg A pointer to the message.
* @param discard - the boolean value of discard.
*/
void qd_message_set_discard(qd_message_t *msg, bool discard);
/**
* Has the message been completely received?
* Return true if the message is fully received
* Returns false if only the partial message has been received, if there is more of the message to be received.
*
* @param msg A pointer to the message.
*/
bool qd_message_receive_complete(qd_message_t *msg);
/**
* Returns true if the message has been completely received AND the message has been completely sent.
*/
bool qd_message_send_complete(qd_message_t *msg);
/**
* Flag the message as being send-complete.
*/
void qd_message_set_send_complete(qd_message_t *msg);
/**
* Flag the message as being receive-complete.
*/
void qd_message_set_receive_complete(qd_message_t *msg);
/**
* Returns true if the delivery tag has already been sent.
*/
bool qd_message_tag_sent(qd_message_t *msg);
/**
* Sets if the delivery tag has already been sent out or not.
*/
void qd_message_set_tag_sent(qd_message_t *msg, bool tag_sent);
/**
* Increase the fanout of the message by 1.
*
* @param out_msg A pointer to the message to be sent outbound or to a local
* subscriber.
*/
void qd_message_add_fanout(qd_message_t *out_msg);
/**
* Disable the Q2-holdoff for this message.
*
* Note: this call may invoke the Q2 unblock hander routine associated with
* this message. See qd_message_set_q2_unblocked_handler().
*
* @param msg A pointer to the message
*/
void qd_message_Q2_holdoff_disable(qd_message_t *msg);
/**
* Check if a message has hit its Q2 limit and is currently blocked.
* When blocked no further message data will be read from the link.
*
* @param msg A pointer to the message
*/
bool qd_message_is_Q2_blocked(const qd_message_t *msg);
/**
* Register a callback that will be invoked when the message has exited the Q2
* blocking state. Note that the callback can be invoked on any I/O thread.
* The callback must be thread safe.
*
* @param msg The message to monitor.
* @param callback The method to invoke
* @param context safe pointer holding the context
*/
typedef void (*qd_message_q2_unblocked_handler_t)(qd_alloc_safe_ptr_t context);
void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
qd_message_q2_unblocked_handler_t callback,
qd_alloc_safe_ptr_t context);
void qd_message_clear_q2_unblocked_handler(qd_message_t *msg);
/**
* Return message aborted state
* @param msg A pointer to the message
* @return true if the message has been aborted
*/
bool qd_message_aborted(const qd_message_t *msg);
/**
* Set the aborted flag on the message.
* @param msg A pointer to the message
*/
void qd_message_set_aborted(const qd_message_t *msg);
/**
* Return message priority
* @param msg A pointer to the message
* @return The message priority value. Default if not present.
*/
uint8_t qd_message_get_priority(qd_message_t *msg);
/**
* True if message is larger that maxMessageSize
* @param msg A pointer to the message
* @return
*/
bool qd_message_oversize(const qd_message_t *msg);
///@}
#endif