blob: 07e144ad67b56452b8802e7f7ce06237299694c6 [file] [log] [blame]
#ifndef __dispatch_message_h__
#define __dispatch_message_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/compose.h>
#include <qpid/dispatch/parse.h>
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/log.h>
#include <proton/raw_connection.h>
/**@file
* Message representation.
*
* @defgroup message message
*
* Message representation.
* @{
*/
// DISPATCH-807 Queue depth limits
// upper and lower limits for bang bang hysteresis control
//
// Q2 defines the maximum number of buffers allowed in a message's buffer
// chain. This limits the number of bytes that will be read from an incoming
// link (pn_link_recv) for the current message. Once Q2 is enabled no further
// pn_link_recv calls will be done on the link. Q2 remains in effect until enough
// bytes have been consumed by the outgoing link(s) to drop the number of
// buffered bytes below the lower threshold.
#define QD_QLIMIT_Q2_UPPER 256 // disable pn_link_recv (qd_buffer_t's)
#define QD_QLIMIT_Q2_LOWER 128 // re-enable pn_link_recv
//
// Q3 limits the number of bytes allowed to be buffered in a session's outgoing
// buffer. Once the Q3 upper limit is hit (read via pn_session_outgoing_bytes),
// pn_link_send will no longer be called for ALL outgoing links sharing the
// session. When enough outgoing bytes have been drained below the lower limit
// pn_link_sends will resume.
#define QD_QLIMIT_Q3_UPPER (QD_QLIMIT_Q3_LOWER * 2) // in pn_buffer_t's
#define QD_QLIMIT_Q3_LOWER (QD_QLIMIT_Q2_UPPER * 2) // 2 == a guess
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
typedef struct qd_message_t qd_message_t;
typedef struct qd_message_stream_data_t qd_message_stream_data_t;
/** Amount of message to be parsed. */
typedef enum {
QD_DEPTH_NONE,
QD_DEPTH_HEADER,
QD_DEPTH_DELIVERY_ANNOTATIONS,
QD_DEPTH_MESSAGE_ANNOTATIONS,
QD_DEPTH_PROPERTIES,
QD_DEPTH_APPLICATION_PROPERTIES,
QD_DEPTH_BODY,
QD_DEPTH_ALL
} qd_message_depth_t;
/** Message fields */
typedef enum {
QD_FIELD_NONE, // reserved
//
// Message Sections
//
QD_FIELD_HEADER,
QD_FIELD_DELIVERY_ANNOTATION,
QD_FIELD_MESSAGE_ANNOTATION,
QD_FIELD_PROPERTIES,
QD_FIELD_APPLICATION_PROPERTIES,
QD_FIELD_BODY,
QD_FIELD_FOOTER,
//
// Fields of the Header Section
// Ordered by list position
//
QD_FIELD_DURABLE,
QD_FIELD_PRIORITY,
QD_FIELD_TTL,
QD_FIELD_FIRST_ACQUIRER,
QD_FIELD_DELIVERY_COUNT,
//
// Fields of the Properties Section
// Ordered by list position
//
QD_FIELD_MESSAGE_ID,
QD_FIELD_USER_ID,
QD_FIELD_TO,
QD_FIELD_SUBJECT,
QD_FIELD_REPLY_TO,
QD_FIELD_CORRELATION_ID,
QD_FIELD_CONTENT_TYPE,
QD_FIELD_CONTENT_ENCODING,
QD_FIELD_ABSOLUTE_EXPIRY_TIME,
QD_FIELD_CREATION_TIME,
QD_FIELD_GROUP_ID,
QD_FIELD_GROUP_SEQUENCE,
QD_FIELD_REPLY_TO_GROUP_ID
} qd_message_field_t;
/**
* Allocate a new message.
*
* @return A pointer to a qd_message_t that is the sole reference to a newly allocated
* message.
*/
qd_message_t *qd_message(void);
/**
* Free a message reference. If this is the last reference to the message, free the
* message as well.
*
* @param msg A pointer to a qd_message_t that is no longer needed.
*/
void qd_message_free(qd_message_t *msg);
/**
* Make a new reference to an existing message.
*
* @param msg A pointer to a qd_message_t referencing a message.
* @return A new pointer to the same referenced message.
*/
qd_message_t *qd_message_copy(qd_message_t *msg);
/**
* Retrieve the message annotations from a message and place them in message storage.
*
* IMPORTANT: The pointer returned by this function remains owned by the message.
* The caller MUST NOT free the parsed field.
*
* @param msg Pointer to a received message.
*/
void qd_message_message_annotations(qd_message_t *msg);
/**
* Set the value for the QD_MA_TRACE field in the outgoing message annotations
* for the message.
*
* IMPORTANT: This method takes ownership of the trace_field - the calling
* method must not reference it after this call.
*
* @param msg Pointer to an outgoing message.
* @param trace_field Pointer to a composed field representing the list that
* will be used as the value for the QD_MA_TRACE map entry. If null, the
* message will not have a QA_MA_TRACE message annotation field. Ownership of
* this field is transferred to the message.
*
*/
void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *trace_field);
/**
* Set the value for the QD_MA_TO field in the outgoing message annotations for
* the message.
*
* IMPORTANT: This method takes ownership of the to_field - the calling
* method must not reference it after this call.
*
* @param msg Pointer to an outgoing message.
* @param to_field Pointer to a composed field representing the to override
* address that will be used as the value for the QD_MA_TO map entry. If null,
* the message will not have a QA_MA_TO message annotation field. Ownership of
* this field is transferred to the message.
*
*/
void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field);
/**
* Set a phase for the phase annotation in the message.
*
* @param msg Pointer to an outgoing message.
* @param phase The phase of the address for the outgoing message.
*
*/
void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
int qd_message_get_phase_annotation(const qd_message_t *msg);
/**
* Indicate whether message should be considered to be streaming.
*
* @param msg Pointer to an outgoing message.
* @param stream true if the message is streaming
*
*/
void qd_message_set_stream_annotation(qd_message_t *msg, bool stream);
/**
* Test whether received message should be considered to be streaming.
*
* @param msg Pointer to an outgoing message.
* @return true if the received message has the streaming annotation set, else false.
*
*/
int qd_message_is_streaming(qd_message_t *msg);
/**
* Set the value for the QD_MA_INGRESS field in the outgoing message
* annotations for the message.
*
* IMPORTANT: This method takes ownership of the ingress_field - the calling
* method must not reference it after this call.
*
* @param msg Pointer to an outgoing message.
* @param ingress_field Pointer to a composed field representing ingress router
* that will be used as the value for the QD_MA_INGRESS map entry. If null,
* the message will not have a QA_MA_INGRESS message annotation field.
* Ownership of this field is transferred to the message.
*
*/
void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *ingress_field);
/**
* Receive message data frame by frame via a delivery. This function may be called more than once on the same
* delivery if the message spans multiple frames. Always returns a message. The message buffers are filled up to the point with the data that was been received so far.
* The buffer keeps filling up on successive calls to this function.
*
* @param delivery An incoming delivery from a link
* @return A pointer to the complete message or 0 if the message is not yet complete.
*/
qd_message_t *qd_message_receive(pn_delivery_t *delivery);
/**
* Returns the PN_DELIVERY_CTX record from the attachments
*
* @param delivery An incoming delivery from a link
* @return - pointer to qd_message_t object
*/
qd_message_t * qd_get_message_context(pn_delivery_t *delivery);
/**
* Send the message outbound on an outgoing link.
*
* @param msg A pointer to a message to be sent.
* @param link The outgoing link on which to send the message.
* @param strip_outbound_annotations [in] annotation control flag
* @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full
*/
void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *q3_stalled);
/**
* Check that the message is well-formed up to a certain depth. Any part of the message that is
* beyond the specified depth is not checked for validity.
*
* Note: some message sections are optional - QD_MESSAGE_OK is returned if the
* optional section is not present, as that is valid.
*/
typedef enum {
QD_MESSAGE_DEPTH_INVALID, // corrupt or malformed message detected
QD_MESSAGE_DEPTH_OK, // valid up to depth, including 'depth' if not optional
QD_MESSAGE_DEPTH_INCOMPLETE // have not received up to 'depth', or partial depth
} qd_message_depth_status_t;
qd_message_depth_status_t qd_message_check_depth(const qd_message_t *msg, qd_message_depth_t depth);
/**
* Return an iterator for the requested message field. If the field is not in the message,
* return NULL.
*
* @param msg A pointer to a message.
* @param field The field to be returned via iterator.
* @return A field iterator that spans the requested field.
*/
qd_iterator_t *qd_message_field_iterator_typed(qd_message_t *msg, qd_message_field_t field);
qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t field);
ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field);
ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char *buffer, size_t *hdr_length);
//
// Functions for composed messages
//
// Convenience Functions
void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers);
void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool receive_complete);
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, bool receive_complete);
void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3, bool receive_complete);
/**
* qd_message_extend
*
* Extend the content of a streaming message with more buffers.
*
* @param msg Pointer to a message
* @param field A composed field to be appended to the end of the message's stream
* @param q2_blocked Set to true if this call caused Q2 to block
* @return The number of buffers stored in the message's content
*/
int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked);
/**
* qd_message_stream_data_iterator
*
* Return an iterator that references the content (not the performative headers)
* of the entire body-data section.
*
* The returned iterator must eventually be freed by the caller.
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return Pointer to an iterator referencing the stream_data content
*/
qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_buffer_count
*
* Return the number of buffers that are needed to hold this body-data's content.
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return Number of pn_raw_buffers needed to contain the entire content of this stream_data.
*/
int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_buffers
*
* Populate an array of pn_raw_buffer_t objects with references to the stream_data's content.
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @param buffers Pointer to an array of pn_raw_buffer_t objects
* @param offset The offset (in the stream_data's buffer set) from which copying should begin
* @param count The number of pn_raw_buffer_t objects in the buffers array
* @return The number of pn_raw_buffer_t objects that were overwritten
*/
int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count);
/**
* qd_message_stream_data_payload_length
*
* Given a stream_data object, return the length of the payload.
* This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the stream_data object
*
* @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return The length of the payload of the passed in body data object.
*/
size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data);
/**
* qd_message_stream_data_release
*
* Release buffers that were associated with a body-data section. It is not required that body-data
* objects be released in the same order in which they were offered.
*
* Once this function is called, the caller must drop its reference to the stream_data object
* and not use it again.
*
* @param stream_data Pointer to a body data object returned by qd_message_next_stream_data
*/
void qd_message_stream_data_release(qd_message_stream_data_t *stream_data);
typedef enum {
QD_MESSAGE_STREAM_DATA_BODY_OK, // A valid body data object has been returned
QD_MESSAGE_STREAM_DATA_FOOTER_OK, // A valid footer has been returned
QD_MESSAGE_STREAM_DATA_INCOMPLETE, // The next body data is incomplete, try again later
QD_MESSAGE_STREAM_DATA_NO_MORE, // There are no more body data objects in this stream
QD_MESSAGE_STREAM_DATA_INVALID // The next body data is invalid, the stream is corrupted
} qd_message_stream_data_result_t;
/**
* qd_message_next_stream_data
*
* Get the next body-data section from this streaming message return the result and
* possibly the valid, completed stream_data object.
*
* @param msg Pointer to a message
* @param stream_data Output pointer to a stream_data object (or 0 if not OK)
* @return The stream_data_result describing the result of this operation
*/
qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, qd_message_stream_data_t **stream_data);
/**
* qd_message_stream_data_append
*
* Append the buffers in data as a sequence of one or more BODY_DATA sections
* to the given message. The buffers in data are moved into the message
* content by this function.
*
* @param msg Pointer to message under construction
* @param data List of buffers containing body data.
* @param qd_blocked Set to true if this call caused Q2 to block
* @return The number of buffers stored in the message's content
*/
int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data, bool *q2_blocked);
/** Put string representation of a message suitable for logging in buffer.
* @return buffer
*/
char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits log_message);
/** Recommended buffer length for qd_message_repr */
int qd_message_repr_len();
qd_log_source_t* qd_message_log_source();
/**
* Accessor for message field ingress
*
* @param msg A pointer to the message
* @return the parsed field
*/
qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg);
/**
* Accessor for message field phase
*
* @param msg A pointer to the message
* @return the parsed field
*/
qd_parsed_field_t *qd_message_get_phase(qd_message_t *msg);
/**
* Accessor for message field to_override
*
* @param msg A pointer to the message
* @return the parsed field
*/
qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
/**
* Accessor for message field trace
*
* @param msg A pointer to the message
* @return the parsed field
*/
qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg);
/**
* Accessor for message field phase
*
* @param msg A pointer to the message
* @return the phase as an integer
*/
int qd_message_get_phase_val (qd_message_t *msg);
/**
* Should the message be discarded.
* A message can be discarded if the disposition is released or rejected.
*
* @param msg A pointer to the message.
**/
bool qd_message_is_discard(qd_message_t *msg);
/**
*Set the discard field on the message to to the passed in boolean value.
*
* @param msg A pointer to the message.
* @param discard - the boolean value of discard.
*/
void qd_message_set_discard(qd_message_t *msg, bool discard);
/**
* Has the message been completely received?
* Return true if the message is fully received
* Returns false if only the partial message has been received, if there is more of the message to be received.
*
* @param msg A pointer to the message.
*/
bool qd_message_receive_complete(qd_message_t *msg);
/**
* Returns true if the message has been completely received AND the message has been completely sent.
*/
bool qd_message_send_complete(qd_message_t *msg);
/**
* Flag the message as being send-complete.
*/
void qd_message_set_send_complete(qd_message_t *msg);
/**
* Flag the message as being receive-complete.
*/
void qd_message_set_receive_complete(qd_message_t *msg);
/**
* Returns true if the delivery tag has already been sent.
*/
bool qd_message_tag_sent(qd_message_t *msg);
/**
* Sets if the delivery tag has already been sent out or not.
*/
void qd_message_set_tag_sent(qd_message_t *msg, bool tag_sent);
/**
* Increase the fanout of the message by 1.
*
* @param in_msg A pointer to the inbound message.
* @param out_msg A pointer to the outbound message or 0 if forwarding to a
* local subscriber.
*/
void qd_message_add_fanout(qd_message_t *in_msg,
qd_message_t *out_msg);
/**
* Disable the Q2-holdoff for this message.
*
* @param msg A pointer to the message
*/
void qd_message_Q2_holdoff_disable(qd_message_t *msg);
/**
* Test if attempt to retrieve message data through qd_message_recv should block
* due to Q2 input holdoff limit being exceeded. This message has enough
* buffers in the internal buffer chain and any calls to to qd_message_receive
* will not result in a call to pn_link_receive to retrieve more data.
*
* @param msg A pointer to the message
*/
bool qd_message_Q2_holdoff_should_block(qd_message_t *msg);
/**
* Test if a message that is blocked by Q2 input holdoff has enough room
* to begin receiving again. This message has transmitted and disposed of
* enough buffers to begin receiving more data from the underlying proton link.
*
* @param msg A pointer to the message
*/
bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg);
/**
* Check if a message has hit its Q2 limit and is currently blocked.
* When blocked no further message data will be read from the link.
*
* @param msg A pointer to the message
*/
bool qd_message_is_Q2_blocked(const qd_message_t *msg);
/**
* Register a callback that will be invoked when the message has exited the Q2
* blocking state. Note that the callback can be invoked on any I/O thread.
* The callback must be thread safe.
*
* @param msg The message to monitor.
* @param callback The method to invoke
* @param context safe pointer holding the context
*/
typedef void (*qd_message_q2_unblocked_handler_t)(qd_alloc_safe_ptr_t context);
void qd_message_set_q2_unblocked_handler(qd_message_t *msg,
qd_message_q2_unblocked_handler_t callback,
qd_alloc_safe_ptr_t context);
void qd_message_clear_q2_unblocked_handler(qd_message_t *msg);
/**
* Return message aborted state
* @param msg A pointer to the message
* @return true if the message has been aborted
*/
bool qd_message_aborted(const qd_message_t *msg);
/**
* Set the aborted flag on the message.
* @param msg A pointer to the message
* @param aborted
*/
void qd_message_set_aborted(const qd_message_t *msg, bool aborted);
/**
* Return message priority
* @param msg A pointer to the message
* @return The message priority value. Default if not present.
*/
uint8_t qd_message_get_priority(qd_message_t *msg);
/**
* True if message is larger that maxMessageSize
* @param msg A pointer to the message
* @return
*/
bool qd_message_oversize(const qd_message_t *msg);
///@}
#endif