blob: e24561f058a149899b33c5fd697bba9117181dd0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "core_client_api.h"
#include "core_link_endpoint.h"
#include "delivery.h"
#include <inttypes.h>
#include <stdio.h>
#include <time.h>
#define CORRELATION_ID_LEN 32
const char *CORRELATION_ID_FMT = "client-%016"PRIx64"%08"PRIx32;
typedef struct qdrc_client_request_t qdrc_client_request_t;
struct qdrc_client_request_t {
DEQ_LINKS_N(SEND_Q, qdrc_client_request_t);
DEQ_LINKS_N(UNSETTLED, qdrc_client_request_t);
DEQ_LINKS_N(REPLY, qdrc_client_request_t);
qdrc_client_t *client;
void *req_context;
char correlation_id[CORRELATION_ID_LEN];
qd_iterator_t *correlation_key;
qd_hash_handle_t *hash_handle;
qdr_delivery_t *delivery;
qdr_core_timer_t *timer;
qd_composed_field_t *app_properties;
qd_composed_field_t *body;
bool on_send_queue; // to be sent
bool on_unsettled_list; // awaiting disposition
bool on_reply_list; // awaiting reply message
qdrc_client_on_reply_CT_t on_reply_cb;
qdrc_client_on_ack_CT_t on_ack_cb;
qdrc_client_request_done_CT_t done_cb;
};
DEQ_DECLARE(qdrc_client_request_t, qdrc_client_request_list_t);
ALLOC_DECLARE(qdrc_client_request_t);
ALLOC_DEFINE(qdrc_client_request_t);
struct qdrc_client_t {
qdr_core_t *core;
qd_hash_t *correlations;
qdrc_endpoint_t *sender; // for outgoing management request messages
qdrc_endpoint_t *receiver; // for incoming management reply messages
bool sender_up;
bool receiver_up;
bool active;
char *reply_to;
qdrc_client_request_list_t send_queue;
qdrc_client_request_list_t unsettled_list;
qdrc_client_request_list_t reply_list; // for expected reply
uint32_t next_cid; // correlation id generation
uint32_t rx_credit_window; // initial credit grant
int tx_credit;
void *user_context;
qdrc_client_on_state_CT_t on_state_cb;
qdrc_client_on_flow_CT_t on_flow_cb;
};
ALLOC_DECLARE(qdrc_client_t);
ALLOC_DEFINE(qdrc_client_t);
static void _send_request_CT(qdrc_client_t *client,
qdrc_client_request_t *req);
static void _flush_send_queue_CT(qdrc_client_t *client);
static void _state_updated_CT(qdrc_client_t *client);
static void _sender_second_attach_CT(void *client_context,
qdr_terminus_t *remote_source,
qdr_terminus_t *remote_target);
static void _receiver_second_attach_CT(void *client_context,
qdr_terminus_t *remote_source,
qdr_terminus_t *remote_target);
static void _sender_flow_CT(void *client_context,
int available_credit,
bool drain);
static void _sender_update_CT(void *client_context,
qdr_delivery_t *delivery,
bool settled,
uint64_t disposition);
static void _receiver_transfer_CT(void *client_context,
qdr_delivery_t *delivery,
qd_message_t *message);
static void _sender_detached_CT(void *client_context,
qdr_error_t *error);
static void _receiver_detached_CT(void *client_context,
qdr_error_t *error);
static void _sender_cleanup_CT(void *client_context);
static void _receiver_cleanup_CT(void *client_context);
static void _free_request_CT(qdrc_client_t *client,
qdrc_client_request_t *req,
const char *error);
static qd_message_t *_create_message_CT(qdrc_client_t *client,
qdrc_client_request_t *req);
static void _timer_expired(qdr_core_t *core, void *context);
static qdrc_endpoint_desc_t sender_endpoint = {
.label = "core client - sender",
.on_second_attach = _sender_second_attach_CT,
.on_flow = _sender_flow_CT,
.on_update = _sender_update_CT,
.on_first_detach = _sender_detached_CT,
.on_cleanup = _sender_cleanup_CT
};
static qdrc_endpoint_desc_t receiver_endpoint = {
.label = "core client - receiver",
.on_second_attach = _receiver_second_attach_CT,
.on_transfer = _receiver_transfer_CT,
.on_first_detach = _receiver_detached_CT,
.on_cleanup = _receiver_cleanup_CT
};
qdrc_client_t *qdrc_client_CT(qdr_core_t *core,
qdr_connection_t *conn,
qdr_terminus_t *target,
uint32_t credit_window,
void *user_context,
qdrc_client_on_state_CT_t on_state_cb,
qdrc_client_on_flow_CT_t on_flow_cb)
{
qdrc_client_t *client = new_qdrc_client_t();
if (!client)
return NULL;
ZERO(client);
client->core = core;
client->correlations = qd_hash(6, 4, 0);
client->next_cid = rand();
client->rx_credit_window = credit_window;
client->user_context = user_context;
client->on_state_cb = on_state_cb;
client->on_flow_cb = on_flow_cb;
// create links
client->sender = qdrc_endpoint_create_link_CT(core,
conn,
QD_OUTGOING,
NULL, // source terminus
target,
&sender_endpoint,
client);
// create receiver link for replies from interior management
qdr_terminus_t *source = qdr_terminus(0);
source->dynamic = true;
client->receiver = qdrc_endpoint_create_link_CT(core,
conn,
QD_INCOMING,
source,
NULL, // target terminus
&receiver_endpoint,
client);
qd_log(core->log, QD_LOG_TRACE, //
"New core client created c=%p", (void *) client);
return client;
}
void qdrc_client_free_CT(qdrc_client_t *client)
{
if (!client)
return;
if (client->sender) {
client->sender = NULL;
}
if (client->receiver) {
client->receiver = NULL;
}
qdrc_client_request_t *req = DEQ_HEAD(client->send_queue);
while (req) {
_free_request_CT(client, req, NULL); // removes from send_queue
req = DEQ_HEAD(client->send_queue);
}
req = DEQ_HEAD(client->unsettled_list);
while (req) {
_free_request_CT(client, req, NULL); // removes from unsettled_list
req = DEQ_HEAD(client->unsettled_list);
}
req = DEQ_HEAD(client->reply_list);
while (req) {
_free_request_CT(client, req, NULL); // removes from reply_list
req = DEQ_HEAD(client->reply_list);
}
qd_hash_free(client->correlations);
free(client->reply_to);
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client freed c=%p", (void *) client);
free_qdrc_client_t(client);
}
// send a message
int qdrc_client_request_CT(qdrc_client_t *client,
void *request_context,
qd_composed_field_t *app_properties,
qd_composed_field_t *body,
uint32_t timeout,
qdrc_client_on_reply_CT_t on_reply_cb,
qdrc_client_on_ack_CT_t on_ack_cb,
qdrc_client_request_done_CT_t done_cb)
{
qd_log(client->core->log, QD_LOG_TRACE,
"New core client request created c=%p, rc=%p", (void *) client,
request_context);
qdrc_client_request_t *req = new_qdrc_client_request_t();
ZERO(req);
req->client = client;
req->req_context = request_context;
req->app_properties = app_properties;
req->body = body;
req->on_reply_cb = on_reply_cb;
req->on_ack_cb = on_ack_cb;
req->done_cb = done_cb;
if (timeout) {
req->timer = qdr_core_timer_CT(client->core, _timer_expired, req);
qdr_core_timer_schedule_CT(client->core, req->timer, timeout);
}
_send_request_CT(client, req);
return 0;
}
// attempt to send a new request message
static void _send_request_CT(qdrc_client_t *client,
qdrc_client_request_t *req)
{
DEQ_INSERT_TAIL_N(SEND_Q, client->send_queue, req);
req->on_send_queue = true;
_flush_send_queue_CT(client);
}
// send any pending messages on the send_queue
static void _flush_send_queue_CT(qdrc_client_t *client)
{
qdrc_client_request_t *req = DEQ_HEAD(client->send_queue);
while (req && client->tx_credit > 0) {
bool presettled = (req->on_ack_cb == NULL);
if (req->on_reply_cb && !client->reply_to) {
// cannot send until receiver comes up
break;
}
qd_message_t *msg = _create_message_CT(client, req);
req->delivery = qdrc_endpoint_delivery_CT(client->core,
client->sender,
msg);
qdr_delivery_incref(req->delivery, "core client send request");
qdrc_endpoint_send_CT(client->core,
client->sender,
req->delivery,
presettled);
DEQ_REMOVE_HEAD_N(SEND_Q, client->send_queue);
req->on_send_queue = false;
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client request sent c=%p, rc=%p dlv=%p cid=%s", //
(void *) client, req->req_context, (void *) req->delivery, //
*req->correlation_id ? req->correlation_id : "<none>");
if (!presettled && req->on_ack_cb) {
DEQ_INSERT_TAIL_N(UNSETTLED, client->unsettled_list, req);
req->on_unsettled_list = true;
}
if (req->on_reply_cb) {
DEQ_INSERT_TAIL_N(REPLY, client->reply_list, req);
req->on_reply_list = true;
}
if (!req->on_reply_list && !req->on_unsettled_list) {
// "Fire and forget" no need to keep the request any longer
_free_request_CT(client, req, NULL);
}
client->tx_credit -= 1;
req = DEQ_HEAD(client->send_queue);
}
}
static void _free_request_CT(qdrc_client_t *client,
qdrc_client_request_t *req,
const char *error)
{
if (req->timer) {
qdr_core_timer_free_CT(client->core, req->timer);
}
if (req->on_send_queue)
DEQ_REMOVE_N(SEND_Q, client->send_queue, req);
if (req->on_unsettled_list)
DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req);
if (req->on_reply_list)
DEQ_REMOVE_N(REPLY, client->reply_list, req);
if (req->hash_handle) {
qd_hash_remove_by_handle(client->correlations, req->hash_handle);
qd_hash_handle_free(req->hash_handle);
}
if (req->correlation_key) {
qd_iterator_free(req->correlation_key);
}
if (req->body) {
qd_compose_free(req->body);
}
if (req->app_properties) {
qd_compose_free(req->app_properties);
}
if (req->delivery) {
qdr_delivery_decref_CT(client->core, req->delivery, "core client send request");
}
// notify user that the request has completed
if (req->done_cb) {
req->done_cb(client->core,
client,
client->user_context,
req->req_context,
error);
}
qd_log(client->core->log, QD_LOG_TRACE, //
"Freeing core client request c=%p, rc=%p (%s)", //
(void *) client, req->req_context, //
error ? error : "request complete");
free_qdrc_client_request_t(req);
}
// issue state change callbacks if necessary
static void _state_updated_CT(qdrc_client_t *client)
{
if (client->on_state_cb) {
bool new_state = (client->sender_up && client->receiver_up);
if (new_state != client->active) {
client->active = new_state;
client->on_state_cb(client->core, client, client->user_context, new_state);
if (client->active && client->tx_credit > 0)
client->on_flow_cb(client->core,
client,
client->user_context,
client->tx_credit,
false);
}
}
}
static void _sender_second_attach_CT(void *context,
qdr_terminus_t *remote_source,
qdr_terminus_t *remote_target)
{
qdrc_client_t *client = (qdrc_client_t *)context;
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client sender 2nd attach c=%p", (void *) client);
if (!client->sender_up) {
client->sender_up = true;
_state_updated_CT(client);
}
qdr_terminus_free(remote_source);
qdr_terminus_free(remote_target);
}
static void _receiver_second_attach_CT(void *context,
qdr_terminus_t *remote_source,
qdr_terminus_t *remote_target)
{
qdrc_client_t *client = (qdrc_client_t *)context;
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client receiver 2nd attach c=%p", (void *) client);
if (!client->receiver_up) {
client->receiver_up = true;
client->reply_to = qdr_field_copy(remote_source->address);
qdrc_endpoint_flow_CT(client->core, client->receiver, client->rx_credit_window, false);
_state_updated_CT(client);
}
qdr_terminus_free(remote_source);
qdr_terminus_free(remote_target);
}
static void _sender_flow_CT(void *context,
int available_credit,
bool drain)
{
qdrc_client_t *client = (qdrc_client_t *)context;
qdr_core_t *core = client->core;
client->tx_credit += available_credit;
qd_log(core->log, QD_LOG_TRACE, //
"Core client sender flow granted c=%p credit=%d d=%s", //
(void *) client, client->tx_credit, (drain) ? "T" : "F");
if (client->tx_credit > 0) {
_flush_send_queue_CT(client);
}
if (client->active && client->on_flow_cb)
client->on_flow_cb(core,
client,
client->user_context,
client->tx_credit,
drain);
if (drain) {
client->tx_credit = 0;
}
}
// disposition update on sent request
static void _sender_update_CT(void *context,
qdr_delivery_t *delivery,
bool settled,
uint64_t disposition)
{
qdrc_client_t *client = (qdrc_client_t *)context;
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client sender update c=%p dlv=%p d=%" PRIx64 " %s", //
(void *) client, (void *) delivery, disposition, //
settled ? "settled" : "unsettled");
if (disposition) {
// should be on unsettled list
qdrc_client_request_t *req = DEQ_HEAD(client->unsettled_list);
DEQ_FIND_N(UNSETTLED, req, (req->delivery == delivery));
if (req) {
assert(req->on_ack_cb);
req->on_ack_cb(client->core,
client,
client->user_context,
req->req_context,
disposition);
// remove from unsettled list
DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req);
req->on_unsettled_list = false;
// delivery no longer needed
qdr_delivery_decref_CT(client->core, req->delivery, "core client send request");
req->delivery = 0;
if (!req->on_reply_list || disposition != PN_ACCEPTED) {
// no reply is coming, release the request
_free_request_CT(client, req, NULL);
}
} else {
// may have received reply so this is not an error
qd_log(client->core->log, QD_LOG_DEBUG,
"Core client could not find request for disposition update"
" client=%p delivery=%p",
(void *) client, (void *) delivery);
}
}
}
static void _receiver_transfer_CT(void *client_context,
qdr_delivery_t *delivery,
qd_message_t *message)
{
qdrc_client_t *client = (qdrc_client_t *)client_context;
qdr_core_t *core = client->core;
bool complete = qd_message_receive_complete(message);
qd_log(core->log, QD_LOG_TRACE,
"Core client received msg c=%p complete=%s", //
(void *) client, complete ? "T" : "F");
if (complete) {
uint64_t disposition = PN_ACCEPTED;
// lookup the corresponding request using the correlation-id
qd_iterator_t *cid_iter = qd_message_field_iterator(message,
QD_FIELD_CORRELATION_ID);
if (cid_iter) {
qdrc_client_request_t *req = NULL;
qd_hash_retrieve(client->correlations, cid_iter, (void **)&req);
qd_iterator_free(cid_iter);
if (req) {
qd_log(core->log, QD_LOG_TRACE,
"Core client received msg c=%p rc=%p cid=%s", //
(void *) client, req->req_context, req->correlation_id);
qd_hash_remove_by_handle(client->correlations, req->hash_handle);
qd_hash_handle_free(req->hash_handle);
req->hash_handle = 0;
assert(req->on_reply_list);
DEQ_REMOVE_N(REPLY, client->reply_list, req);
req->on_reply_list = false;
qd_iterator_t *app_props = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
qd_iterator_t *body = qd_message_field_iterator(message, QD_FIELD_BODY);
assert(req->on_reply_cb);
disposition = req->on_reply_cb(core,
client,
client->user_context,
req->req_context,
app_props,
body);
// should we keep req if still waiting for disposition update
// on sent message? I say "no"...
_free_request_CT(client, req, NULL);
} else {
// request may be old...
qd_log(core->log, QD_LOG_WARNING,
"Core client reply message dropped: no matching correlation-id");
disposition = PN_ACCEPTED;
}
} else {
qd_log(core->log, QD_LOG_ERROR, "Invalid core client reply message: no correlation-id");
disposition = PN_REJECTED;
}
qdrc_endpoint_settle_CT(core, delivery, disposition);
qdrc_endpoint_flow_CT(core,
client->receiver,
1,
false);
}
}
static void _sender_detached_CT(void *client_context,
qdr_error_t *error)
{
qdrc_client_t *client = (qdrc_client_t *)client_context;
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client sender detached c=%p", (void *) client);
if (client->sender_up) {
client->sender_up = false;
client->tx_credit = 0;
// abort all pending and unsettled requests
//
qdrc_client_request_t *req = DEQ_HEAD(client->send_queue);
while (req) {
_free_request_CT(client, req, "link detached"); // removes from send_queue
req = DEQ_HEAD(client->send_queue);
}
req = DEQ_HEAD(client->unsettled_list);
while (req) {
_free_request_CT(client, req, "link detached"); // removes from unsettled_list
req = DEQ_HEAD(client->unsettled_list);
}
_state_updated_CT(client);
}
qdr_error_free(error);
client->sender = NULL;
}
static void _receiver_detached_CT(void *client_context,
qdr_error_t *error)
{
qdrc_client_t *client = (qdrc_client_t *)client_context;
qd_log(client->core->log, QD_LOG_TRACE, //
"Core client receiver detached c=%p", (void *) client);
if (client->receiver_up) {
client->receiver_up = false;
free(client->reply_to);
client->reply_to = 0;
// abort all waiting requests
//
qdrc_client_request_t *req = DEQ_HEAD(client->reply_list);
while (req) {
_free_request_CT(client, req, "link detached"); // removes from reply list
req = DEQ_HEAD(client->reply_list);
}
_state_updated_CT(client);
}
qdr_error_free(error);
client->receiver = NULL;
}
static void _sender_cleanup_CT(void *client_context)
{
_sender_detached_CT(client_context, NULL);
}
static void _receiver_cleanup_CT(void *client_context)
{
_receiver_detached_CT(client_context, NULL);
}
static qd_message_t *_create_message_CT(qdrc_client_t *client,
qdrc_client_request_t *req)
{
// build necessary message headers, etc:
qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
qd_compose_start_list(fld);
qd_compose_insert_bool(fld, 0); // durable
qd_compose_end_list(fld);
if (req->on_reply_cb) {
// generate unique correlation-id
snprintf(req->correlation_id,
CORRELATION_ID_LEN, CORRELATION_ID_FMT,
(uint64_t)time(NULL), client->next_cid++);
req->correlation_key = qd_iterator_string(req->correlation_id,
ITER_VIEW_ALL);
qd_hash_insert(client->correlations,
req->correlation_key,
req,
&req->hash_handle);
fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld);
qd_compose_start_list(fld);
qd_compose_insert_null(fld); // message-id
qd_compose_insert_null(fld); // user-id
qd_compose_insert_null(fld); // to
qd_compose_insert_null(fld); // subject
assert(client->reply_to);
qd_compose_insert_string(fld, client->reply_to);
qd_compose_insert_string(fld, req->correlation_id);
qd_compose_end_list(fld);
}
qd_message_t *message = 0;
if (req->app_properties) {
message = qd_message_compose(fld, req->app_properties, req->body, true);
} else {
message = qd_message_compose(fld, req->body, 0, true);
}
req->body = 0;
req->app_properties = 0;
return message;
}
// a request has timed out
static void _timer_expired(qdr_core_t *core, void *context)
{
qdrc_client_request_t *req = (qdrc_client_request_t *)context;
qdrc_client_t *client = req->client;
_free_request_CT(client, req, "Timed out");
}