| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "delivery.h" |
| #include <inttypes.h> |
| |
| ALLOC_DEFINE(qdr_delivery_t); |
| |
| |
| static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); |
| static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); |
| static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard); |
| static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery); |
| static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery); |
| static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv, |
| qdr_delivery_t *peer, uint64_t new_disp, bool settled, |
| qdr_error_t *error); |
| |
| |
| void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context) |
| { |
| delivery->context = context; |
| } |
| |
| |
| void *qdr_delivery_get_context(const qdr_delivery_t *delivery) |
| { |
| return delivery->context; |
| } |
| |
| qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery) |
| { |
| return delivery ? safe_deref_qdr_link_t(delivery->link_sp) : 0; |
| } |
| |
| |
| bool qdr_delivery_send_complete(const qdr_delivery_t *delivery) |
| { |
| if (!delivery) |
| return false; |
| return qd_message_send_complete(delivery->msg); |
| } |
| |
| |
| bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery) |
| { |
| if (!delivery) |
| return false; |
| return qd_message_tag_sent(delivery->msg); |
| } |
| |
| |
| void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent) |
| { |
| if (!delivery) |
| return; |
| |
| qd_message_set_tag_sent(delivery->msg, tag_sent); |
| } |
| |
| |
| bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery) |
| { |
| if (!delivery) |
| return false; |
| return qd_message_receive_complete(delivery->msg); |
| } |
| |
| |
| // set the local disposition (to be send to remote endpoint) |
| void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition) |
| { |
| if (delivery) |
| delivery->disposition = disposition; |
| } |
| |
| |
| // get the current local disposition |
| uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery) |
| { |
| if (!delivery) |
| return 0; |
| return delivery->disposition; |
| } |
| |
| |
| void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label) |
| { |
| uint32_t rc = sys_atomic_inc(&delivery->ref_count); |
| assert(rc > 0 || !delivery->ref_counted); |
| delivery->ref_counted = true; |
| qdr_link_t *link = qdr_delivery_link(delivery); |
| if (link) |
| qd_log(link->core->log, QD_LOG_DEBUG, "Delivery incref: dlv:%lx rc:%"PRIu32" link:%"PRIu64" %s", (long) delivery, rc + 1, link->identity, label); |
| } |
| |
| |
| void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted) |
| { |
| assert(delivery); |
| qd_message_set_aborted(delivery->msg, aborted); |
| } |
| |
| bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery) |
| { |
| if (!delivery) |
| return false; |
| return qd_message_aborted(delivery->msg); |
| } |
| |
| |
| void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label) |
| { |
| // Grab the link identity if the link is still active |
| qdr_link_t *link = qdr_delivery_link(delivery); |
| uint64_t link_identity = link ? link->identity : 0; |
| |
| uint32_t ref_count = sys_atomic_dec(&delivery->ref_count); |
| assert(ref_count > 0); |
| |
| qd_log(core->log, QD_LOG_DEBUG, "Delivery decref: dlv:%lx rc:%"PRIu32" link:%"PRIu64" %s", (long) delivery, ref_count - 1, link_identity, label); |
| |
| if (ref_count == 1) { |
| // The ref_count was 1 and now it is zero. We are deleting the last ref. |
| // The delivery deletion must occur inside the core thread. |
| // Queue up an action to do the work. |
| // |
| qdr_action_t *action = qdr_action(qdr_delete_delivery_CT, "delete_delivery"); |
| action->args.delivery.delivery = delivery; |
| action->label = label; |
| qdr_action_enqueue(core, action); |
| } |
| } |
| |
| |
| void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length) |
| { |
| *tag = (const char*) delivery->tag; |
| *length = delivery->tag_length; |
| } |
| |
| |
| qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery) |
| { |
| if (!delivery) |
| return 0; |
| return delivery->msg; |
| } |
| |
| |
| qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery) |
| { |
| return delivery->error; |
| } |
| |
| |
| bool qdr_delivery_presettled(const qdr_delivery_t *delivery) |
| { |
| return delivery->presettled; |
| } |
| |
| |
| // remote endpoint modified its disposition and/or settlement |
| void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, |
| bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given) |
| { |
| qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); |
| action->args.delivery.delivery = delivery; |
| action->args.delivery.disposition = disposition; |
| action->args.delivery.settled = settled; |
| action->args.delivery.error = error; |
| |
| // handle delivery-state extensions e.g. declared, transactional-state |
| qdr_delivery_set_extension_state(delivery, disposition, ext_state, false); |
| |
| // |
| // The delivery's ref_count must be incremented to protect its travels into the |
| // core thread. If the caller has given its reference to us, we can simply use |
| // the given ref rather than increment a new one. |
| // |
| if (!ref_given) |
| qdr_delivery_incref(delivery, "qdr_delivery_update_disposition - add to action list"); |
| |
| qdr_action_enqueue(core, action); |
| } |
| |
| |
| qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled) |
| { |
| |
| qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue"); |
| action->args.connection.delivery = in_dlv; |
| |
| qd_message_t *msg = qdr_delivery_message(in_dlv); |
| action->args.connection.more = !qd_message_receive_complete(msg); |
| action->args.delivery.presettled = settled; |
| |
| // This incref is for the action reference |
| qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list"); |
| qdr_action_enqueue(core, action); |
| return in_dlv; |
| } |
| |
| |
| void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv) |
| { |
| bool push = false; |
| bool moved = false; |
| |
| if (dlv->presettled) { |
| // |
| // The delivery is presettled. We simply want to call CORE_delivery_update which in turn will |
| // restart stalled links if the q2_holdoff has been hit. |
| // For single frame presettled deliveries, calling CORE_delivery_update does not do anything. |
| // |
| push = true; |
| } |
| else { |
| push = dlv->disposition != PN_RELEASED; |
| dlv->disposition = PN_RELEASED; |
| dlv->settled = true; |
| moved = qdr_delivery_settled_CT(core, dlv); |
| |
| } |
| |
| if (push || moved) |
| qdr_delivery_push_CT(core, dlv); |
| |
| // |
| // Remove the unsettled reference |
| // |
| if (moved) |
| qdr_delivery_decref_CT(core, dlv, "qdr_delivery_release_CT - remove from unsettled list"); |
| } |
| |
| |
| void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *dlv) |
| { |
| bool push = dlv->disposition != PN_MODIFIED; |
| |
| dlv->disposition = PN_MODIFIED; |
| dlv->settled = true; |
| bool moved = qdr_delivery_settled_CT(core, dlv); |
| |
| if (push || moved) |
| qdr_delivery_push_CT(core, dlv); |
| |
| // |
| // Remove the unsettled reference |
| // |
| if (moved) |
| qdr_delivery_decref_CT(core, dlv, "qdr_delivery_failed_CT - remove from unsettled list"); |
| } |
| |
| |
| void qdr_delivery_reject_CT(qdr_core_t *core, qdr_delivery_t *dlv) |
| { |
| bool push = dlv->disposition != PN_REJECTED; |
| |
| dlv->disposition = PN_REJECTED; |
| dlv->settled = true; |
| bool moved = qdr_delivery_settled_CT(core, dlv); |
| |
| if (push || moved) |
| qdr_delivery_push_CT(core, dlv); |
| |
| // |
| // Remove the unsettled reference |
| // |
| if (moved) |
| qdr_delivery_decref_CT(core, dlv, "qdr_delivery_reject_CT - remove from unsettled list"); |
| } |
| |
| |
| bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) |
| { |
| // |
| // Remove a delivery from its unsettled list. Side effects include issuing |
| // replacement credit and visiting the link-quiescence algorithm |
| // |
| qdr_link_t *link = qdr_delivery_link(dlv); |
| qdr_connection_t *conn = link ? link->conn : 0; |
| bool moved = false; |
| |
| if (!link || !conn) |
| return false; |
| |
| // |
| // The lock needs to be acquired only for outgoing links |
| // |
| if (link->link_direction == QD_OUTGOING) |
| sys_mutex_lock(conn->work_lock); |
| |
| if (dlv->where == QDR_DELIVERY_IN_UNSETTLED) { |
| DEQ_REMOVE(link->unsettled, dlv); |
| dlv->where = QDR_DELIVERY_NOWHERE; |
| moved = true; |
| } |
| |
| if (link->link_direction == QD_OUTGOING) |
| sys_mutex_unlock(conn->work_lock); |
| |
| if (dlv->tracking_addr) { |
| dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--; |
| dlv->tracking_addr->tracked_deliveries--; |
| |
| if (dlv->tracking_addr->tracked_deliveries == 0) |
| qdr_check_addr_CT(core, dlv->tracking_addr); |
| |
| dlv->tracking_addr = 0; |
| } |
| |
| // |
| // If this is an incoming link and it is not link-routed or inter-router, issue |
| // one replacement credit on the link. Note that credit on inter-router links is |
| // issued immediately even for unsettled deliveries. |
| // |
| if (moved && link->link_direction == QD_INCOMING && |
| link->link_type != QD_LINK_ROUTER && !link->edge && !link->connected_link) |
| qdr_link_issue_credit_CT(core, link, 1, false); |
| |
| return moved; |
| } |
| |
| void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery) |
| { |
| qdr_link_t *link = qdr_delivery_link(delivery); |
| if (link) { |
| bool do_rate = false; |
| |
| // router sets the disposition for incoming links, outgoing is set by |
| // the remote |
| const uint64_t outcome = (link->link_direction == QD_INCOMING) |
| ? delivery->disposition // local |
| : delivery->remote_disposition; |
| |
| if (delivery->presettled) { |
| do_rate = outcome != PN_RELEASED; |
| link->presettled_deliveries++; |
| if (link->link_direction == QD_INCOMING && link->link_type == QD_LINK_ENDPOINT) |
| core->presettled_deliveries++; |
| } |
| else if (outcome == PN_ACCEPTED) { |
| do_rate = true; |
| link->accepted_deliveries++; |
| if (link->link_direction == QD_INCOMING) |
| core->accepted_deliveries++; |
| } |
| else if (outcome == PN_REJECTED) { |
| do_rate = true; |
| link->rejected_deliveries++; |
| if (link->link_direction == QD_INCOMING) |
| core->rejected_deliveries++; |
| } |
| else if (outcome == PN_RELEASED && !delivery->presettled) { |
| link->released_deliveries++; |
| if (link->link_direction == QD_INCOMING) |
| core->released_deliveries++; |
| } |
| else if (outcome == PN_MODIFIED) { |
| link->modified_deliveries++; |
| if (link->link_direction == QD_INCOMING) |
| core->modified_deliveries++; |
| } |
| |
| qd_log(core->log, QD_LOG_DEBUG, "Delivery outcome for%s: dlv:%lx link:%"PRIu64" is %s", delivery->presettled?" pre-settled":"", (long) delivery, link->identity, pn_disposition_type_name(outcome)); |
| |
| uint32_t delay = core->uptime_ticks - delivery->ingress_time; |
| if (delay > 10) { |
| link->deliveries_delayed_10sec++; |
| if (link->link_direction == QD_INCOMING) |
| core->deliveries_delayed_10sec++; |
| } else if (delay > 1) { |
| link->deliveries_delayed_1sec++; |
| if (link->link_direction == QD_INCOMING) |
| core->deliveries_delayed_1sec++; |
| } |
| |
| // |
| // If this delivery was marked as stuck, decrement the currently-stuck counters in the link and router. |
| // |
| if (delivery->stuck) { |
| link->deliveries_stuck--; |
| core->deliveries_stuck--; |
| } |
| |
| if (qd_bitmask_valid_bit_value(delivery->ingress_index) && link->ingress_histogram) |
| link->ingress_histogram[delivery->ingress_index]++; |
| |
| // |
| // Compute the settlement rate |
| // |
| if (do_rate) { |
| uint32_t delta_time = core->uptime_ticks - link->core_ticks; |
| if (delta_time > 0) { |
| if (delta_time > QDR_LINK_RATE_DEPTH) |
| delta_time = QDR_LINK_RATE_DEPTH; |
| for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++) { |
| link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH; |
| link->settled_deliveries[link->rate_cursor] = 0; |
| } |
| link->core_ticks = core->uptime_ticks; |
| } |
| link->settled_deliveries[link->rate_cursor]++; |
| } |
| } |
| } |
| |
| |
| static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery) |
| { |
| assert(sys_atomic_get(&delivery->ref_count) == 0); |
| |
| if (delivery->msg || delivery->to_addr) { |
| qdr_delivery_cleanup_t *cleanup = new_qdr_delivery_cleanup_t(); |
| |
| DEQ_ITEM_INIT(cleanup); |
| cleanup->msg = delivery->msg; |
| cleanup->iter = delivery->to_addr; |
| |
| DEQ_INSERT_TAIL(core->delivery_cleanup_list, cleanup); |
| } |
| |
| if (delivery->tracking_addr) { |
| delivery->tracking_addr->outstanding_deliveries[delivery->tracking_addr_bit]--; |
| delivery->tracking_addr->tracked_deliveries--; |
| |
| if (delivery->tracking_addr->tracked_deliveries == 0) |
| qdr_check_addr_CT(core, delivery->tracking_addr); |
| |
| delivery->tracking_addr = 0; |
| } |
| |
| qdr_delivery_increment_counters_CT(core, delivery); |
| |
| // |
| // Remove any subscription references |
| // |
| qdr_subscription_ref_t *sub = DEQ_HEAD(delivery->subscriptions); |
| while (sub) { |
| qdr_del_subscription_ref_CT(&delivery->subscriptions, sub); |
| sub = DEQ_HEAD(delivery->subscriptions); |
| } |
| |
| // |
| // Free all the peer qdr_delivery_ref_t references |
| // |
| qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers); |
| while (ref) { |
| qdr_del_delivery_ref(&delivery->peers, ref); |
| ref = DEQ_HEAD(delivery->peers); |
| } |
| |
| qd_bitmask_free(delivery->link_exclusion); |
| qdr_error_free(delivery->error); |
| qdr_delivery_free_extension_state(delivery); |
| |
| free_qdr_delivery_t(delivery); |
| } |
| |
| |
| // Returns the number of peers for dlv |
| int qdr_delivery_peer_count_CT(const qdr_delivery_t *dlv) |
| { |
| return (dlv->peer) ? 1 : DEQ_SIZE(dlv->peers); |
| } |
| |
| |
| void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv) |
| { |
| // If there is no delivery or a peer, we cannot link each other. |
| if (!in_dlv || !out_dlv) |
| return; |
| |
| if (qdr_delivery_peer_count_CT(in_dlv) == 0) { |
| // This is the very first peer. Link them up. |
| assert(!out_dlv->peer); |
| in_dlv->peer = out_dlv; |
| } |
| else { |
| if (in_dlv->peer) { |
| // This is the first time we know that in_dlv is going to have more than one peer. |
| // There is already a peer in the in_dlv->peer pointer, move it into a list and zero it out. |
| qdr_add_delivery_ref_CT(&in_dlv->peers, in_dlv->peer); |
| |
| // Zero out the peer pointer. Since there is more than one peer, this peer has been moved to the "peers" linked list. |
| // All peers will now reside in the peers linked list. No need to decref/incref here because you are transferring ownership. |
| in_dlv->peer = 0; |
| } |
| |
| qdr_add_delivery_ref_CT(&in_dlv->peers, out_dlv); |
| } |
| |
| out_dlv->peer = in_dlv; |
| |
| qdr_delivery_incref(out_dlv, "qdr_delivery_link_peers_CT - linked to peer (out delivery)"); |
| qdr_delivery_incref(in_dlv, "qdr_delivery_link_peers_CT - linked to peer (in delivery)"); |
| } |
| |
| |
| void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer) |
| { |
| // If there is no delivery or a peer, we cannot proceed. |
| if (!dlv || !peer) |
| return; |
| |
| // first, drop dlv's reference to its peer |
| // |
| if (dlv->peer) { |
| // |
| // This is the easy case. One delivery has only one peer. we can simply |
| // zero them out and directly decref. |
| // |
| assert(dlv->peer == peer); |
| dlv->peer = 0; |
| } else { |
| // |
| // This is the not so easy case |
| // |
| // dlv has more than one peer, so we have to search for our target peer |
| // in the list of peers |
| // |
| qdr_delivery_ref_t *peer_ref = DEQ_HEAD(dlv->peers); |
| while (peer_ref && peer_ref->dlv != peer) { |
| peer_ref = DEQ_NEXT(peer_ref); |
| } |
| assert(peer_ref != 0); |
| if (peer_ref == dlv->next_peer_ref) |
| // ok to unlink peers while iterating over them |
| dlv->next_peer_ref = DEQ_NEXT(peer_ref); |
| qdr_del_delivery_ref(&dlv->peers, peer_ref); |
| } |
| |
| // now drop the peer's reference to dlv |
| // |
| if (peer->peer) { |
| assert(peer->peer == dlv); |
| peer->peer = 0; |
| } else { |
| qdr_delivery_ref_t *peer_ref = DEQ_HEAD(peer->peers); |
| while (peer_ref && peer_ref->dlv != dlv) { |
| peer_ref = DEQ_NEXT(peer_ref); |
| } |
| assert(peer_ref != 0); |
| if (peer_ref == peer->next_peer_ref) |
| // ok to unlink peers while iterating over them |
| peer->next_peer_ref = DEQ_NEXT(peer_ref); |
| qdr_del_delivery_ref(&peer->peers, peer_ref); |
| } |
| |
| qdr_delivery_decref_CT(core, dlv, "qdr_delivery_unlink_peers_CT - unlinked from peer (delivery)"); |
| qdr_delivery_decref_CT(core, peer, "qdr_delivery_unlink_peers_CT - unlinked from delivery (peer)"); |
| } |
| |
| |
| qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv) |
| { |
| // What if there are no peers for this delivery? |
| if (qdr_delivery_peer_count_CT(dlv) == 0) |
| return 0; |
| |
| if (dlv->peer) { |
| // If there is a dlv->peer, it is the one and only peer. |
| return dlv->peer; |
| } |
| else { |
| // The delivery has more than one peer. |
| qdr_delivery_ref_t *peer_ref = DEQ_HEAD(dlv->peers); |
| |
| // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT |
| dlv->next_peer_ref = DEQ_NEXT(peer_ref); |
| |
| // Return the first peer. |
| return peer_ref->dlv; |
| } |
| } |
| |
| qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv) |
| { |
| if (dlv->peer) { |
| // There is no next_peer if there is only one peer. If there is a non-zero dlv->peer, it is the only peer |
| return 0; |
| } |
| else { |
| // There is more than one peer to this delivery. |
| qdr_delivery_ref_t *next_peer_ref = dlv->next_peer_ref; |
| if (next_peer_ref) { |
| // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT |
| dlv->next_peer_ref = DEQ_NEXT(dlv->next_peer_ref); |
| return next_peer_ref->dlv; |
| } |
| return 0; |
| } |
| } |
| |
| |
| void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv, const char *label) |
| { |
| // Grab the link identity if the link is still active |
| qdr_link_t *link = qdr_delivery_link(dlv); |
| uint64_t link_identity = link ? link->identity : 0; |
| |
| uint32_t ref_count = sys_atomic_dec(&dlv->ref_count); |
| assert(ref_count > 0); |
| |
| qd_log(core->log, QD_LOG_DEBUG, "Delivery decref_CT: dlv:%lx rc:%"PRIu32" link:%"PRIu64" %s", (long) dlv, ref_count - 1, link_identity, label); |
| |
| if (ref_count == 1) |
| qdr_delete_delivery_internal_CT(core, dlv); |
| } |
| |
| |
| // the remote endpoint has change the state (disposition) or settlement for the |
| // delivery. Update the local state/settlement accordingly. |
| // |
| static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) |
| { |
| if (discard) |
| return; |
| |
| qdr_delivery_t *dlv = action->args.delivery.delivery; |
| qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv); |
| uint64_t new_disp = action->args.delivery.disposition; |
| bool settled = action->args.delivery.settled; |
| qdr_error_t *error = action->args.delivery.error; |
| bool free_error = true; |
| |
| if (dlv->multicast) { |
| // |
| // remote state change for *inbound* multicast delivery, |
| // update downstream *outbound* peers |
| // |
| qdr_delivery_mcast_inbound_update_CT(core, dlv, new_disp, settled); |
| |
| } else if (peer && peer->multicast) { |
| // |
| // remote state change for an *outbound* delivery to a multicast address, |
| // propagate upstream to *inbound* delivery (peer) |
| // |
| // coverity[swapped_arguments] |
| qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, new_disp, settled); |
| |
| } else { |
| // |
| // Anycast forwarding - note: peer _may_ be freed by this call |
| // |
| free_error = !qdr_delivery_anycast_update_CT(core, dlv, peer, new_disp, settled, error); |
| } |
| |
| // |
| // Release the action reference, possibly freeing the delivery |
| // |
| qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action"); |
| |
| if (free_error) |
| qdr_error_free(error); |
| } |
| |
| |
| // The remote delivery state (disposition) and/or remote settlement for an |
| // anycast delivery has changed. Propagate the changes to its peer delivery. |
| // |
| // returns true if ownership of error parameter is taken (do not free it) |
| // |
| static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv, |
| qdr_delivery_t *peer, uint64_t new_disp, bool settled, |
| qdr_error_t *error) |
| { |
| bool push = false; |
| bool peer_moved = false; |
| bool dlv_moved = false; |
| bool error_assigned = false; |
| qdr_link_t *dlink = qdr_delivery_link(dlv); |
| |
| assert(!dlv->multicast); |
| assert(!peer || !peer->multicast); |
| |
| if (peer) |
| qdr_delivery_incref(peer, "qdr_delivery_anycast_update_CT - prevent peer from being freed"); |
| |
| // |
| // Non-multicast Logic: |
| // |
| // If disposition has changed and there is a peer link, set the disposition |
| // of the peer |
| // If remote settled, the delivery must be unlinked and freed. |
| // If remote settled and there is a peer, the peer shall be settled and |
| // unlinked. It shall not be freed until the connection-side thread |
| // settles the PN delivery. |
| // |
| if (new_disp != dlv->remote_disposition) { |
| // |
| // Remote disposition has changed, propagate the change to the peer |
| // delivery local disposition. |
| // |
| dlv->remote_disposition = new_disp; |
| if (peer) { |
| peer->disposition = new_disp; |
| peer->error = error; |
| push = true; |
| error_assigned = true; |
| qdr_delivery_set_extension_state(peer, |
| dlv->remote_disposition, |
| dlv->extension_state, |
| false); |
| } |
| } |
| |
| if (settled) { |
| if (peer) { |
| peer->settled = true; |
| if (qdr_delivery_link(peer)) { |
| peer_moved = qdr_delivery_settled_CT(core, peer); |
| } |
| |
| // expect: caller holds reference to dlv, so unlink will not free it |
| // expect: peer refcount held locally, so unlink will not free it |
| assert(sys_atomic_get(&dlv->ref_count) > 1); |
| assert(sys_atomic_get(&peer->ref_count) > 1); |
| qdr_delivery_unlink_peers_CT(core, dlv, peer); |
| } |
| |
| if (dlink) |
| // DISPATCH-1544: caller holds reference - dlv not freed |
| /* coverity[pass_freed_arg] */ |
| dlv_moved = qdr_delivery_settled_CT(core, dlv); |
| } |
| |
| // |
| // If the delivery's link has a core endpoint, notify the endpoint of the update |
| // |
| if (dlink && dlink->core_endpoint) |
| qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled); |
| |
| if (push || peer_moved) |
| // DISPATCH-1544: peer incref-ed above - will not be freed |
| /* coverity[deref_arg] */ |
| qdr_delivery_push_CT(core, peer); |
| |
| // |
| // Release the unsettled references if the deliveries were moved/settled |
| // |
| if (dlv_moved) |
| qdr_delivery_decref_CT(core, dlv, "qdr_delivery_anycast_update CT - dlv removed from unsettled"); |
| if (peer_moved) |
| qdr_delivery_decref_CT(core, peer, "qdr_delivery_anycast_update_CT - peer removed from unsettled"); |
| if (peer) |
| qdr_delivery_decref_CT(core, peer, "qdr_delivery_anycast_update_CT - allow free of peer"); |
| |
| return error_assigned; |
| } |
| |
| |
| // The remote delivery state (disposition) and/or remote settlement for an |
| // incoming multicast delivery has changed. Propagate the changes "downstream" |
| // to the outbound peers. Once all peers have settled then settle the in_dlv |
| // |
| void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, |
| uint64_t new_disp, bool settled) |
| { |
| if (!in_dlv) |
| return; |
| |
| bool update_disp = new_disp && in_dlv->remote_disposition != new_disp; |
| |
| assert(in_dlv->multicast); // expect in_dlv to be the inbound delivery |
| |
| qd_log(core->log, QD_LOG_TRACE, |
| "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s", |
| in_dlv, new_disp, (settled) ? "True" : "False"); |
| |
| if (update_disp) |
| in_dlv->remote_disposition = new_disp; |
| |
| qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv); |
| while (out_peer) { |
| bool push = false; |
| bool moved = false; |
| bool unlink = false; |
| |
| // |
| // AMQP 1.0 allows the sender to specify a disposition |
| // so forward it along |
| // |
| if (update_disp && out_peer->disposition != new_disp) { |
| out_peer->disposition = new_disp; |
| push = true; |
| // extension state/error ignored, not sure how |
| // that can be supported for mcast... |
| } |
| |
| // |
| // the sender has settled |
| // |
| if (settled) { |
| out_peer->settled = true; |
| if (qdr_delivery_link(out_peer)) { |
| moved = qdr_delivery_settled_CT(core, out_peer); |
| } |
| unlink = true; |
| } |
| |
| if (push || moved) { |
| qdr_delivery_push_CT(core, out_peer); |
| } |
| |
| if (moved) { |
| // expect: in_dlv still references out_peer (with refcount), so |
| // out_peer will not be freed by this decref: |
| assert(!unlink || sys_atomic_get(&out_peer->ref_count) > 1); |
| qdr_delivery_decref_CT(core, out_peer, |
| "qdr_delivery_mcast_inbound_update_CT - removed out_peer from unsettled"); |
| } |
| |
| qd_log(core->log, QD_LOG_TRACE, |
| "Updating mcast delivery (%p) out peer (%p) updated disp=%s settled=%s", |
| in_dlv, out_peer, (push) ? "True" : "False", |
| (unlink) ? "True" : "False"); |
| |
| if (unlink) { |
| // expect: in_dlv should not be freed here as caller must hold reference: |
| assert(sys_atomic_get(&in_dlv->ref_count) > 1); |
| qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer); // may free out_peer! |
| } |
| |
| // DISPATCH-1544: |
| /* coverity[deref_arg] */ |
| out_peer = qdr_delivery_next_peer_CT(in_dlv); |
| } |
| |
| if (settled) { |
| assert(qdr_delivery_peer_count_CT(in_dlv) == 0); |
| in_dlv->settled = true; |
| if (qdr_delivery_settled_CT(core, in_dlv)) { |
| qdr_delivery_decref_CT(core, in_dlv, |
| "qdr_delivery_mcast_inbound_update CT - in_dlv removed from unsettled"); |
| } |
| } |
| } |
| |
| |
| // An outgoing peer delivery of an incoming multicast delivery has settled. |
| // Settle the inbound delivery after all of its outbound deliveries |
| // have been settled. |
| // |
| // Note: this call may free either in_dlv or out_dlv by unlinking them. The |
| // caller must increment the reference count for these deliveries if they are |
| // to be referenced after this call. |
| // |
| // moved: set to true if in_dlv has been removed from the unsettled list |
| // return: true if in_dlv has been settled |
| // |
| static bool qdr_delivery_mcast_outbound_settled_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, |
| qdr_delivery_t *out_dlv, bool *moved) |
| { |
| bool push = false; |
| *moved = false; |
| |
| assert(in_dlv->multicast && out_dlv->peer == in_dlv); |
| assert(qdr_delivery_peer_count_CT(out_dlv) == 1); |
| |
| int peer_count = qdr_delivery_peer_count_CT(in_dlv); |
| |
| if (peer_count == 1) { |
| // |
| // This out_dlv is the last outgoing peer so |
| // we can now settle in_dlv |
| // |
| in_dlv->settled = true; |
| push = true; |
| if (qdr_delivery_link(in_dlv)) { |
| *moved = qdr_delivery_settled_CT(core, in_dlv); |
| } |
| |
| qd_log(core->log, QD_LOG_TRACE, |
| "mcast delivery (%p) has settled, disp=0x%"PRIx64, |
| in_dlv, in_dlv->disposition); |
| } else { |
| |
| qd_log(core->log, QD_LOG_TRACE, |
| "mcast delivery (%p) out peer (%p) has settled, remaining peers=%d", |
| in_dlv, out_dlv, peer_count - 1); |
| } |
| |
| // now settle the peer itself and remove it from link unsettled list |
| |
| out_dlv->settled = true; |
| if (qdr_delivery_settled_CT(core, out_dlv)) { |
| qdr_delivery_decref_CT(core, out_dlv, "qdr_delivery_mcast_outbound_settled_CT - out_dlv removed from unsettled"); |
| } |
| |
| // do this last since it may free either dlv or out_dlv: |
| qdr_delivery_unlink_peers_CT(core, in_dlv, out_dlv); |
| |
| return push; |
| } |
| |
| |
| // true if delivery state d is a terminal state as defined by AMQP 1.0 |
| // |
| #define IS_TERMINAL(d) (PN_ACCEPTED <= (d) && (d) <= PN_MODIFIED) |
| |
| |
| // an outbound mcast delivery has changed its remote state (disposition) |
| // propagate the change back "upstream" to the inbound delivery |
| // |
| // returns true if dlv disposition has been updated |
| // |
| static bool qdr_delivery_mcast_outbound_disposition_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, |
| qdr_delivery_t *out_dlv, uint64_t new_disp) |
| { |
| // The AMQP 1.0 spec does not define a way to propagate disposition |
| // back to the sender in the case of unsettled multicast. In the |
| // case of multiple different terminal states we have to reconcile |
| // them. We assign an ad hoc priority to each terminal value and |
| // set the final disposition to the highest priority returned |
| // across all receivers. |
| static const int priority[] = { |
| 2, // Accepted |
| 3, // Rejected - highest because reject is a hard error |
| 0, // Released |
| 1, // Modified |
| }; |
| bool push = false; |
| |
| if (!in_dlv || !out_dlv) |
| return push; |
| |
| assert(in_dlv->multicast && out_dlv->peer == in_dlv); |
| assert(qdr_delivery_peer_count_CT(out_dlv) == 1); |
| |
| if (new_disp == 0x33) { // 0x33 == Declared |
| // hack alert - the transaction section of the AMQP 1.0 spec |
| // defines the Declared outcome (0x33) terminal state. |
| qd_log(core->log, QD_LOG_WARNING, |
| "Transactions are not supported for multicast messages"); |
| new_disp = PN_REJECTED; |
| } |
| |
| out_dlv->remote_disposition = new_disp; |
| |
| if (IS_TERMINAL(new_disp)) { |
| // our mcast impl ignores non-terminal outcomes |
| |
| qd_log(core->log, QD_LOG_TRACE, |
| "mcast delivery (%p) out peer (%p) disp updated: 0x%"PRIx64, |
| in_dlv, out_dlv, new_disp); |
| |
| if (in_dlv->mcast_disposition == 0) { |
| in_dlv->mcast_disposition = new_disp; |
| } else { |
| int old_priority = priority[in_dlv->mcast_disposition - PN_ACCEPTED]; |
| int new_priority = priority[new_disp - PN_ACCEPTED]; |
| if (new_priority > old_priority) |
| in_dlv->mcast_disposition = new_disp; |
| } |
| |
| // wait until all peers have set terminal state before setting it on |
| // the in_dlv |
| // |
| qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); |
| while (peer) { |
| if (!IS_TERMINAL(peer->remote_disposition)) { |
| break; |
| } |
| peer = qdr_delivery_next_peer_CT(in_dlv); |
| } |
| |
| if (!peer) { // all peers have set a terminal state |
| // TODO(kgiusti) what about error parameter? |
| in_dlv->disposition = in_dlv->mcast_disposition; |
| push = true; |
| qd_log(core->log, QD_LOG_TRACE, |
| "mcast delivery (%p) terminal state set: 0x%"PRIx64, |
| in_dlv, in_dlv->disposition); |
| } |
| } |
| |
| return push; |
| } |
| |
| |
| // |
| // The delivery state (disposition) and/or settlement state for a downstream |
| // peer delivery of a multicast inbound delivery has changed. Update the |
| // inbound delivery to reflect these changes. |
| // |
| // Note: if settled is true then in_dlv or out_dlv *may* be released. Callers |
| // should increment their reference counts if either of these deliveries are |
| // referenced after this call. |
| // |
| void qdr_delivery_mcast_outbound_update_CT(qdr_core_t *core, |
| qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv, |
| uint64_t new_disp, bool settled) |
| { |
| bool dlv_moved = false; |
| bool push_dlv = qdr_delivery_mcast_outbound_disposition_CT(core, in_dlv, out_dlv, new_disp); |
| |
| qdr_delivery_incref(in_dlv, "qdr_delivery_mcast_outbound_update_CT - prevent mcast free"); |
| |
| // Note: qdr_delivery_mcast_outbound_settled_CT may free out_dlv! |
| if (settled && qdr_delivery_mcast_outbound_settled_CT(core, in_dlv, out_dlv, &dlv_moved)) { |
| push_dlv = true; |
| } |
| |
| if (push_dlv || dlv_moved) { |
| // DISPATCH-1544: in_dlv incref-ed above - will not be freed at this point |
| /* coverity[deref_arg] */ |
| qdr_delivery_push_CT(core, in_dlv); |
| } |
| if (dlv_moved) { |
| qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_mcast_outbound_update_CT - removed mcast peer from unsettled"); |
| } |
| qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_mcast_outbound_update_CT - allow mcast peer free"); |
| } |
| |
| |
| static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) |
| { |
| if (!discard) |
| qdr_delete_delivery_internal_CT(core, action->args.delivery.delivery); |
| } |
| |
| |
| void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv) |
| { |
| qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); |
| |
| while (peer) { |
| if (! peer->presettled && in_dlv->presettled) { |
| peer->presettled = in_dlv->presettled; |
| } |
| qdr_link_work_t *work = peer->link_work; |
| qdr_link_t *peer_link = qdr_delivery_link(peer); |
| |
| // |
| // Determines if the peer connection can be activated. |
| // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed |
| // after the streaming message has been sent. |
| // |
| if (!!work && !!peer_link) { |
| sys_mutex_lock(peer_link->conn->work_lock); |
| if (work->processing || work == DEQ_HEAD(peer_link->work_list)) { |
| qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], peer_link, QDR_LINK_LIST_CLASS_WORK); |
| sys_mutex_unlock(peer_link->conn->work_lock); |
| |
| // |
| // Activate the outgoing connection for later processing. |
| // |
| qdr_connection_activate_CT(core, peer_link->conn); |
| } |
| else |
| sys_mutex_unlock(peer_link->conn->work_lock); |
| } |
| |
| peer = qdr_delivery_next_peer_CT(in_dlv); |
| } |
| } |
| |
| |
| static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard) |
| { |
| if (discard) |
| return; |
| |
| qdr_delivery_t *in_dlv = action->args.connection.delivery; |
| bool more = action->args.connection.more; |
| bool presettled = action->args.delivery.presettled; |
| |
| // |
| // If the delivery is already pre-settled, don't do anything with the pre-settled flag. |
| // |
| // If the in_delivery was not pre-settled, you can go to pre-settled. |
| if (! in_dlv->presettled && presettled) { |
| in_dlv->presettled = presettled; |
| } |
| |
| qdr_link_t *link = qdr_delivery_link(in_dlv); |
| |
| // |
| // If it is already in the undelivered list, don't try to deliver this again. |
| // |
| if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { |
| qdr_deliver_continue_peers_CT(core, in_dlv); |
| |
| qd_message_t *msg = qdr_delivery_message(in_dlv); |
| |
| if (!more && !qd_message_is_discard(msg)) { |
| // |
| // The entire message has now been received. Check to see if there are in process subscriptions that need to |
| // receive this message. in process subscriptions, at this time, can deal only with full messages. |
| // |
| qdr_subscription_ref_t *subref = DEQ_HEAD(in_dlv->subscriptions); |
| while (subref) { |
| qdr_forward_on_message_CT(core, subref->sub, link, in_dlv->msg); |
| qdr_del_subscription_ref_CT(&in_dlv->subscriptions, subref); |
| subref = DEQ_HEAD(in_dlv->subscriptions); |
| } |
| |
| // This is a presettled multi-frame unicast delivery. |
| if (in_dlv->settled) { |
| |
| // |
| // If a delivery is settled but did not go into one of the lists, that means that it is going nowhere. |
| // We dont want to deal with such deliveries. |
| // |
| if (in_dlv->settled && in_dlv->where == QDR_DELIVERY_NOWHERE) { |
| qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 1"); |
| return; |
| } |
| |
| assert(in_dlv->where == QDR_DELIVERY_IN_SETTLED); |
| // |
| // The router will settle on behalf of the receiver in the case of multicast and send out settled |
| // deliveries to the receivers. |
| // |
| in_dlv->disposition = PN_ACCEPTED; |
| qdr_delivery_push_CT(core, in_dlv); |
| |
| // |
| // The in_dlv has one or more peers. These peers will have to be unlinked. |
| // |
| qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); |
| qdr_delivery_t *next_peer = 0; |
| while (peer) { |
| next_peer = qdr_delivery_next_peer_CT(in_dlv); |
| // DISPATCH-1544: ensure there is an additional ref held by action |
| // so in_dlv will not be freed when peer unlinked |
| assert(sys_atomic_get(&in_dlv->ref_count) > 1); |
| qdr_delivery_unlink_peers_CT(core, in_dlv, peer); // peer make be freed here! |
| peer = next_peer; |
| } |
| |
| // Remove the delivery from the settled list and decref the in_dlv. |
| /* coverity[deref_after_free] */ |
| in_dlv->where = QDR_DELIVERY_NOWHERE; |
| DEQ_REMOVE(link->settled, in_dlv); |
| // expect: action holds a ref to in_dlv, so it should not be freed here |
| assert(sys_atomic_get(&in_dlv->ref_count) > 1); |
| qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list"); |
| } |
| } |
| } |
| |
| // This decref is for the action reference |
| qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 2"); |
| } |
| |
| |
| void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) |
| { |
| qdr_link_t *link = qdr_delivery_link(dlv); |
| if (!link) |
| return; |
| |
| bool activate = false; |
| |
| sys_mutex_lock(link->conn->work_lock); |
| if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { |
| qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list"); |
| qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv); |
| qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK); |
| activate = true; |
| } |
| sys_mutex_unlock(link->conn->work_lock); |
| |
| // |
| // Activate the connection |
| // |
| if (activate) |
| qdr_connection_activate_CT(core, link->conn); |
| } |
| |
| |
| pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery) |
| { |
| if (delivery->extension_state) |
| pn_data_rewind(delivery->extension_state); |
| return delivery->extension_state; |
| } |
| |
| |
| void qdr_delivery_free_extension_state(qdr_delivery_t *delivery) |
| { |
| if (delivery->extension_state) { |
| pn_data_free(delivery->extension_state); |
| delivery->extension_state = 0; |
| } |
| } |
| |
| |
| // copy local disposition data into proton delivery |
| void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition) |
| { |
| if (dlv->disposition > PN_MODIFIED) { |
| pn_data_t *src = dlv->extension_state; |
| if (src) { |
| pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), src); |
| qdr_delivery_free_extension_state(dlv); |
| } |
| if (update_disposition) pn_delivery_update(pdlv, dlv->disposition); |
| } |
| } |
| |
| void qdr_delivery_export_transfer_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv) |
| { |
| qdr_delivery_write_extension_state(dlv, pdlv, true); |
| } |
| |
| |
| void qdr_delivery_export_disposition_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv) |
| { |
| qdr_delivery_write_extension_state(dlv, pdlv, false); |
| } |
| |
| |
| void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition) |
| { |
| if (disposition > PN_MODIFIED) { |
| if (disposition_data) { |
| pn_data_rewind(disposition_data); |
| if (!dlv->extension_state) |
| dlv->extension_state = pn_data(0); |
| pn_data_copy(dlv->extension_state, disposition_data); |
| } |
| if (update_disposition) dlv->disposition = disposition; |
| } |
| } |