| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "core_client_api.h" |
| |
| #include "core_link_endpoint.h" |
| #include "delivery.h" |
| |
| #include <inttypes.h> |
| #include <stdio.h> |
| #include <time.h> |
| |
| #define CORRELATION_ID_LEN 32 |
| const char *CORRELATION_ID_FMT = "client-%016"PRIx64"%08"PRIx32; |
| |
| typedef struct qdrc_client_request_t qdrc_client_request_t; |
| struct qdrc_client_request_t { |
| DEQ_LINKS_N(SEND_Q, qdrc_client_request_t); |
| DEQ_LINKS_N(UNSETTLED, qdrc_client_request_t); |
| DEQ_LINKS_N(REPLY, qdrc_client_request_t); |
| |
| qdrc_client_t *client; |
| void *req_context; |
| |
| char correlation_id[CORRELATION_ID_LEN]; |
| qd_iterator_t *correlation_key; |
| qd_hash_handle_t *hash_handle; |
| qdr_delivery_t *delivery; |
| qdr_core_timer_t *timer; |
| |
| qd_composed_field_t *app_properties; |
| qd_composed_field_t *body; |
| |
| bool on_send_queue; // to be sent |
| bool on_unsettled_list; // awaiting disposition |
| bool on_reply_list; // awaiting reply message |
| |
| qdrc_client_on_reply_CT_t on_reply_cb; |
| qdrc_client_on_ack_CT_t on_ack_cb; |
| qdrc_client_request_done_CT_t done_cb; |
| }; |
| DEQ_DECLARE(qdrc_client_request_t, qdrc_client_request_list_t); |
| ALLOC_DECLARE(qdrc_client_request_t); |
| ALLOC_DEFINE(qdrc_client_request_t); |
| |
| |
| struct qdrc_client_t { |
| qdr_core_t *core; |
| qd_hash_t *correlations; |
| |
| qdrc_endpoint_t *sender; // for outgoing management request messages |
| qdrc_endpoint_t *receiver; // for incoming management reply messages |
| bool sender_up; |
| bool receiver_up; |
| bool active; |
| char *reply_to; |
| |
| qdrc_client_request_list_t send_queue; |
| qdrc_client_request_list_t unsettled_list; |
| qdrc_client_request_list_t reply_list; // for expected reply |
| |
| uint32_t next_cid; // correlation id generation |
| uint32_t rx_credit_window; // initial credit grant |
| int tx_credit; |
| |
| void *user_context; |
| qdrc_client_on_state_CT_t on_state_cb; |
| qdrc_client_on_flow_CT_t on_flow_cb; |
| |
| }; |
| ALLOC_DECLARE(qdrc_client_t); |
| ALLOC_DEFINE(qdrc_client_t); |
| |
| |
| static void _send_request_CT(qdrc_client_t *client, |
| qdrc_client_request_t *req); |
| static void _flush_send_queue_CT(qdrc_client_t *client); |
| |
| static void _state_updated_CT(qdrc_client_t *client); |
| |
| static void _sender_second_attach_CT(void *client_context, |
| qdr_terminus_t *remote_source, |
| qdr_terminus_t *remote_target); |
| static void _receiver_second_attach_CT(void *client_context, |
| qdr_terminus_t *remote_source, |
| qdr_terminus_t *remote_target); |
| static void _sender_flow_CT(void *client_context, |
| int available_credit, |
| bool drain); |
| static void _sender_update_CT(void *client_context, |
| qdr_delivery_t *delivery, |
| bool settled, |
| uint64_t disposition); |
| static void _receiver_transfer_CT(void *client_context, |
| qdr_delivery_t *delivery, |
| qd_message_t *message); |
| static void _sender_detached_CT(void *client_context, |
| qdr_error_t *error); |
| static void _receiver_detached_CT(void *client_context, |
| qdr_error_t *error); |
| static void _sender_cleanup_CT(void *client_context); |
| static void _receiver_cleanup_CT(void *client_context); |
| |
| static void _free_request_CT(qdrc_client_t *client, |
| qdrc_client_request_t *req, |
| const char *error); |
| static qd_message_t *_create_message_CT(qdrc_client_t *client, |
| qdrc_client_request_t *req); |
| static void _timer_expired(qdr_core_t *core, void *context); |
| |
| |
| static qdrc_endpoint_desc_t sender_endpoint = { |
| .label = "core client - sender", |
| .on_second_attach = _sender_second_attach_CT, |
| .on_flow = _sender_flow_CT, |
| .on_update = _sender_update_CT, |
| .on_first_detach = _sender_detached_CT, |
| .on_cleanup = _sender_cleanup_CT |
| }; |
| |
| static qdrc_endpoint_desc_t receiver_endpoint = { |
| .label = "core client - receiver", |
| .on_second_attach = _receiver_second_attach_CT, |
| .on_transfer = _receiver_transfer_CT, |
| .on_first_detach = _receiver_detached_CT, |
| .on_cleanup = _receiver_cleanup_CT |
| }; |
| |
| qdrc_client_t *qdrc_client_CT(qdr_core_t *core, |
| qdr_connection_t *conn, |
| qdr_terminus_t *target, |
| uint32_t credit_window, |
| void *user_context, |
| qdrc_client_on_state_CT_t on_state_cb, |
| qdrc_client_on_flow_CT_t on_flow_cb) |
| { |
| qdrc_client_t *client = new_qdrc_client_t(); |
| if (!client) |
| return NULL; |
| |
| ZERO(client); |
| client->core = core; |
| client->correlations = qd_hash(6, 4, 0); |
| client->next_cid = rand(); |
| client->rx_credit_window = credit_window; |
| client->user_context = user_context; |
| client->on_state_cb = on_state_cb; |
| client->on_flow_cb = on_flow_cb; |
| |
| // create links |
| client->sender = qdrc_endpoint_create_link_CT(core, |
| conn, |
| QD_OUTGOING, |
| NULL, // source terminus |
| target, |
| &sender_endpoint, |
| client); |
| // create receiver link for replies from interior management |
| qdr_terminus_t *source = qdr_terminus(0); |
| source->dynamic = true; |
| client->receiver = qdrc_endpoint_create_link_CT(core, |
| conn, |
| QD_INCOMING, |
| source, |
| NULL, // target terminus |
| &receiver_endpoint, |
| client); |
| qd_log(core->log, QD_LOG_TRACE, // |
| "New core client created c=%p", (void *) client); |
| return client; |
| } |
| |
| |
| void qdrc_client_free_CT(qdrc_client_t *client) |
| { |
| if (!client) |
| return; |
| |
| if (client->sender) { |
| client->sender = NULL; |
| } |
| |
| if (client->receiver) { |
| client->receiver = NULL; |
| } |
| |
| qdrc_client_request_t *req = DEQ_HEAD(client->send_queue); |
| while (req) { |
| _free_request_CT(client, req, NULL); // removes from send_queue |
| req = DEQ_HEAD(client->send_queue); |
| } |
| |
| req = DEQ_HEAD(client->unsettled_list); |
| while (req) { |
| _free_request_CT(client, req, NULL); // removes from unsettled_list |
| req = DEQ_HEAD(client->unsettled_list); |
| } |
| |
| req = DEQ_HEAD(client->reply_list); |
| while (req) { |
| _free_request_CT(client, req, NULL); // removes from reply_list |
| req = DEQ_HEAD(client->reply_list); |
| } |
| |
| qd_hash_free(client->correlations); |
| free(client->reply_to); |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client freed c=%p", (void *) client); |
| |
| free_qdrc_client_t(client); |
| } |
| |
| |
| // send a message |
| int qdrc_client_request_CT(qdrc_client_t *client, |
| void *request_context, |
| qd_composed_field_t *app_properties, |
| qd_composed_field_t *body, |
| uint32_t timeout, |
| qdrc_client_on_reply_CT_t on_reply_cb, |
| qdrc_client_on_ack_CT_t on_ack_cb, |
| qdrc_client_request_done_CT_t done_cb) |
| { |
| qd_log(client->core->log, QD_LOG_TRACE, |
| "New core client request created c=%p, rc=%p", (void *) client, |
| request_context); |
| |
| qdrc_client_request_t *req = new_qdrc_client_request_t(); |
| ZERO(req); |
| req->client = client; |
| req->req_context = request_context; |
| req->app_properties = app_properties; |
| req->body = body; |
| req->on_reply_cb = on_reply_cb; |
| req->on_ack_cb = on_ack_cb; |
| req->done_cb = done_cb; |
| if (timeout) { |
| req->timer = qdr_core_timer_CT(client->core, _timer_expired, req); |
| qdr_core_timer_schedule_CT(client->core, req->timer, timeout); |
| } |
| |
| _send_request_CT(client, req); |
| return 0; |
| } |
| |
| |
| // attempt to send a new request message |
| static void _send_request_CT(qdrc_client_t *client, |
| qdrc_client_request_t *req) |
| { |
| DEQ_INSERT_TAIL_N(SEND_Q, client->send_queue, req); |
| req->on_send_queue = true; |
| _flush_send_queue_CT(client); |
| } |
| |
| |
| // send any pending messages on the send_queue |
| static void _flush_send_queue_CT(qdrc_client_t *client) |
| { |
| qdrc_client_request_t *req = DEQ_HEAD(client->send_queue); |
| |
| while (req && client->tx_credit > 0) { |
| bool presettled = (req->on_ack_cb == NULL); |
| |
| if (req->on_reply_cb && !client->reply_to) { |
| // cannot send until receiver comes up |
| break; |
| } |
| |
| qd_message_t *msg = _create_message_CT(client, req); |
| req->delivery = qdrc_endpoint_delivery_CT(client->core, |
| client->sender, |
| msg); |
| qdr_delivery_incref(req->delivery, "core client send request"); |
| qdrc_endpoint_send_CT(client->core, |
| client->sender, |
| req->delivery, |
| presettled); |
| DEQ_REMOVE_HEAD_N(SEND_Q, client->send_queue); |
| req->on_send_queue = false; |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client request sent c=%p, rc=%p dlv=%p cid=%s", // |
| (void *) client, req->req_context, (void *) req->delivery, // |
| *req->correlation_id ? req->correlation_id : "<none>"); |
| |
| if (!presettled && req->on_ack_cb) { |
| DEQ_INSERT_TAIL_N(UNSETTLED, client->unsettled_list, req); |
| req->on_unsettled_list = true; |
| } |
| if (req->on_reply_cb) { |
| DEQ_INSERT_TAIL_N(REPLY, client->reply_list, req); |
| req->on_reply_list = true; |
| } |
| if (!req->on_reply_list && !req->on_unsettled_list) { |
| // "Fire and forget" no need to keep the request any longer |
| _free_request_CT(client, req, NULL); |
| } |
| client->tx_credit -= 1; |
| req = DEQ_HEAD(client->send_queue); |
| } |
| } |
| |
| |
| static void _free_request_CT(qdrc_client_t *client, |
| qdrc_client_request_t *req, |
| const char *error) |
| { |
| if (req->timer) { |
| qdr_core_timer_free_CT(client->core, req->timer); |
| } |
| if (req->on_send_queue) |
| DEQ_REMOVE_N(SEND_Q, client->send_queue, req); |
| if (req->on_unsettled_list) |
| DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req); |
| if (req->on_reply_list) |
| DEQ_REMOVE_N(REPLY, client->reply_list, req); |
| |
| if (req->hash_handle) { |
| qd_hash_remove_by_handle(client->correlations, req->hash_handle); |
| qd_hash_handle_free(req->hash_handle); |
| } |
| |
| if (req->correlation_key) { |
| qd_iterator_free(req->correlation_key); |
| } |
| |
| if (req->body) { |
| qd_compose_free(req->body); |
| } |
| |
| if (req->app_properties) { |
| qd_compose_free(req->app_properties); |
| } |
| |
| if (req->delivery) { |
| qdr_delivery_decref_CT(client->core, req->delivery, "core client send request"); |
| } |
| |
| // notify user that the request has completed |
| if (req->done_cb) { |
| req->done_cb(client->core, |
| client, |
| client->user_context, |
| req->req_context, |
| error); |
| } |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Freeing core client request c=%p, rc=%p (%s)", // |
| (void *) client, req->req_context, // |
| error ? error : "request complete"); |
| |
| free_qdrc_client_request_t(req); |
| } |
| |
| |
| // issue state change callbacks if necessary |
| static void _state_updated_CT(qdrc_client_t *client) |
| { |
| if (client->on_state_cb) { |
| bool new_state = (client->sender_up && client->receiver_up); |
| if (new_state != client->active) { |
| client->active = new_state; |
| client->on_state_cb(client->core, client, client->user_context, new_state); |
| if (client->active && client->tx_credit > 0) |
| client->on_flow_cb(client->core, |
| client, |
| client->user_context, |
| client->tx_credit, |
| false); |
| } |
| } |
| } |
| |
| |
| static void _sender_second_attach_CT(void *context, |
| qdr_terminus_t *remote_source, |
| qdr_terminus_t *remote_target) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)context; |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client sender 2nd attach c=%p", (void *) client); |
| |
| if (!client->sender_up) { |
| client->sender_up = true; |
| _state_updated_CT(client); |
| } |
| qdr_terminus_free(remote_source); |
| qdr_terminus_free(remote_target); |
| } |
| |
| |
| static void _receiver_second_attach_CT(void *context, |
| qdr_terminus_t *remote_source, |
| qdr_terminus_t *remote_target) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)context; |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client receiver 2nd attach c=%p", (void *) client); |
| |
| if (!client->receiver_up) { |
| client->receiver_up = true; |
| client->reply_to = qdr_field_copy(remote_source->address); |
| qdrc_endpoint_flow_CT(client->core, client->receiver, client->rx_credit_window, false); |
| _state_updated_CT(client); |
| } |
| qdr_terminus_free(remote_source); |
| qdr_terminus_free(remote_target); |
| } |
| |
| |
| static void _sender_flow_CT(void *context, |
| int available_credit, |
| bool drain) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)context; |
| qdr_core_t *core = client->core; |
| |
| client->tx_credit += available_credit; |
| qd_log(core->log, QD_LOG_TRACE, // |
| "Core client sender flow granted c=%p credit=%d d=%s", // |
| (void *) client, client->tx_credit, (drain) ? "T" : "F"); |
| if (client->tx_credit > 0) { |
| _flush_send_queue_CT(client); |
| } |
| |
| if (client->active && client->on_flow_cb) |
| client->on_flow_cb(core, |
| client, |
| client->user_context, |
| client->tx_credit, |
| drain); |
| if (drain) { |
| client->tx_credit = 0; |
| } |
| } |
| |
| |
| // disposition update on sent request |
| static void _sender_update_CT(void *context, |
| qdr_delivery_t *delivery, |
| bool settled, |
| uint64_t disposition) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)context; |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client sender update c=%p dlv=%p d=%" PRIx64 " %s", // |
| (void *) client, (void *) delivery, disposition, // |
| settled ? "settled" : "unsettled"); |
| |
| if (disposition) { |
| // should be on unsettled list |
| qdrc_client_request_t *req = DEQ_HEAD(client->unsettled_list); |
| DEQ_FIND_N(UNSETTLED, req, (req->delivery == delivery)); |
| if (req) { |
| assert(req->on_ack_cb); |
| req->on_ack_cb(client->core, |
| client, |
| client->user_context, |
| req->req_context, |
| disposition); |
| // remove from unsettled list |
| DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req); |
| req->on_unsettled_list = false; |
| |
| // delivery no longer needed |
| qdr_delivery_decref_CT(client->core, req->delivery, "core client send request"); |
| req->delivery = 0; |
| |
| if (!req->on_reply_list || disposition != PN_ACCEPTED) { |
| // no reply is coming, release the request |
| _free_request_CT(client, req, NULL); |
| } |
| } else { |
| // may have received reply so this is not an error |
| qd_log(client->core->log, QD_LOG_DEBUG, |
| "Core client could not find request for disposition update" |
| " client=%p delivery=%p", |
| (void *) client, (void *) delivery); |
| } |
| } |
| } |
| |
| |
| static void _receiver_transfer_CT(void *client_context, |
| qdr_delivery_t *delivery, |
| qd_message_t *message) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)client_context; |
| qdr_core_t *core = client->core; |
| bool complete = qd_message_receive_complete(message); |
| |
| qd_log(core->log, QD_LOG_TRACE, |
| "Core client received msg c=%p complete=%s", // |
| (void *) client, complete ? "T" : "F"); |
| |
| if (complete) { |
| uint64_t disposition = PN_ACCEPTED; |
| |
| // lookup the corresponding request using the correlation-id |
| |
| qd_iterator_t *cid_iter = qd_message_field_iterator(message, |
| QD_FIELD_CORRELATION_ID); |
| if (cid_iter) { |
| qdrc_client_request_t *req = NULL; |
| qd_hash_retrieve(client->correlations, cid_iter, (void **)&req); |
| qd_iterator_free(cid_iter); |
| if (req) { |
| qd_log(core->log, QD_LOG_TRACE, |
| "Core client received msg c=%p rc=%p cid=%s", // |
| (void *) client, req->req_context, req->correlation_id); |
| |
| qd_hash_remove_by_handle(client->correlations, req->hash_handle); |
| qd_hash_handle_free(req->hash_handle); |
| req->hash_handle = 0; |
| |
| assert(req->on_reply_list); |
| DEQ_REMOVE_N(REPLY, client->reply_list, req); |
| req->on_reply_list = false; |
| |
| qd_iterator_t *app_props = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES); |
| qd_iterator_t *body = qd_message_field_iterator(message, QD_FIELD_BODY); |
| |
| assert(req->on_reply_cb); |
| disposition = req->on_reply_cb(core, |
| client, |
| client->user_context, |
| req->req_context, |
| app_props, |
| body); |
| // should we keep req if still waiting for disposition update |
| // on sent message? I say "no"... |
| _free_request_CT(client, req, NULL); |
| } else { |
| // request may be old... |
| qd_log(core->log, QD_LOG_WARNING, |
| "Core client reply message dropped: no matching correlation-id"); |
| disposition = PN_ACCEPTED; |
| } |
| } else { |
| qd_log(core->log, QD_LOG_ERROR, "Invalid core client reply message: no correlation-id"); |
| disposition = PN_REJECTED; |
| } |
| |
| qdrc_endpoint_settle_CT(core, delivery, disposition); |
| qdrc_endpoint_flow_CT(core, |
| client->receiver, |
| 1, |
| false); |
| } |
| } |
| |
| |
| static void _sender_detached_CT(void *client_context, |
| qdr_error_t *error) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)client_context; |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client sender detached c=%p", (void *) client); |
| |
| if (client->sender_up) { |
| client->sender_up = false; |
| client->tx_credit = 0; |
| |
| // abort all pending and unsettled requests |
| // |
| qdrc_client_request_t *req = DEQ_HEAD(client->send_queue); |
| while (req) { |
| _free_request_CT(client, req, "link detached"); // removes from send_queue |
| req = DEQ_HEAD(client->send_queue); |
| } |
| req = DEQ_HEAD(client->unsettled_list); |
| while (req) { |
| _free_request_CT(client, req, "link detached"); // removes from unsettled_list |
| req = DEQ_HEAD(client->unsettled_list); |
| } |
| |
| _state_updated_CT(client); |
| } |
| |
| qdr_error_free(error); |
| client->sender = NULL; |
| } |
| |
| |
| static void _receiver_detached_CT(void *client_context, |
| qdr_error_t *error) |
| { |
| qdrc_client_t *client = (qdrc_client_t *)client_context; |
| |
| qd_log(client->core->log, QD_LOG_TRACE, // |
| "Core client receiver detached c=%p", (void *) client); |
| |
| if (client->receiver_up) { |
| client->receiver_up = false; |
| free(client->reply_to); |
| client->reply_to = 0; |
| |
| // abort all waiting requests |
| // |
| qdrc_client_request_t *req = DEQ_HEAD(client->reply_list); |
| while (req) { |
| _free_request_CT(client, req, "link detached"); // removes from reply list |
| req = DEQ_HEAD(client->reply_list); |
| } |
| |
| _state_updated_CT(client); |
| } |
| qdr_error_free(error); |
| client->receiver = NULL; |
| } |
| |
| |
| static void _sender_cleanup_CT(void *client_context) |
| { |
| _sender_detached_CT(client_context, NULL); |
| } |
| |
| |
| static void _receiver_cleanup_CT(void *client_context) |
| { |
| _receiver_detached_CT(client_context, NULL); |
| } |
| |
| |
| static qd_message_t *_create_message_CT(qdrc_client_t *client, |
| qdrc_client_request_t *req) |
| { |
| // build necessary message headers, etc: |
| qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); |
| qd_compose_start_list(fld); |
| qd_compose_insert_bool(fld, 0); // durable |
| qd_compose_end_list(fld); |
| |
| if (req->on_reply_cb) { |
| // generate unique correlation-id |
| snprintf(req->correlation_id, |
| CORRELATION_ID_LEN, CORRELATION_ID_FMT, |
| (uint64_t)time(NULL), client->next_cid++); |
| req->correlation_key = qd_iterator_string(req->correlation_id, |
| ITER_VIEW_ALL); |
| qd_hash_insert(client->correlations, |
| req->correlation_key, |
| req, |
| &req->hash_handle); |
| |
| fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld); |
| qd_compose_start_list(fld); |
| qd_compose_insert_null(fld); // message-id |
| qd_compose_insert_null(fld); // user-id |
| qd_compose_insert_null(fld); // to |
| qd_compose_insert_null(fld); // subject |
| assert(client->reply_to); |
| qd_compose_insert_string(fld, client->reply_to); |
| qd_compose_insert_string(fld, req->correlation_id); |
| qd_compose_end_list(fld); |
| } |
| |
| qd_message_t *message = 0; |
| if (req->app_properties) { |
| message = qd_message_compose(fld, req->app_properties, req->body, true); |
| } else { |
| message = qd_message_compose(fld, req->body, 0, true); |
| } |
| req->body = 0; |
| req->app_properties = 0; |
| |
| return message; |
| } |
| |
| |
| // a request has timed out |
| static void _timer_expired(qdr_core_t *core, void *context) |
| { |
| qdrc_client_request_t *req = (qdrc_client_request_t *)context; |
| qdrc_client_t *client = req->client; |
| _free_request_CT(client, req, "Timed out"); |
| } |