blob: 7dac0bfee121e1b17bd2b676815a8526fe0ab060 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "core_link_endpoint.h"
#include "qpid/dispatch/alloc.h"
#include "delivery.h"
#include <stdio.h>
#include <inttypes.h>
struct qdrc_endpoint_t {
qdrc_endpoint_desc_t *desc;
void *link_context;
qdr_link_t *link;
};
ALLOC_DECLARE(qdrc_endpoint_t);
ALLOC_DEFINE(qdrc_endpoint_t);
void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t *core,
const char *address,
char phase,
qdrc_endpoint_desc_t *desc,
void *bind_context)
{
qdr_address_t *addr = 0;
qd_iterator_t *iter = qd_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
qd_iterator_annotate_phase(iter, phase);
qd_hash_retrieve(core->addr_hash, iter, (void*) &addr);
if (!addr) {
qdr_address_config_t *addr_config = qdr_config_for_address_CT(core, 0, iter);
qd_address_treatment_t treatment = addr_config ? addr_config->treatment : QD_TREATMENT_ANYCAST_BALANCED;
if (treatment == QD_TREATMENT_UNAVAILABLE)
treatment = QD_TREATMENT_ANYCAST_BALANCED;
addr = qdr_address_CT(core, treatment, addr_config);
DEQ_INSERT_TAIL(core->addrs, addr);
qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
}
assert(addr->core_endpoint == 0);
addr->core_endpoint = desc;
addr->core_endpoint_context = bind_context;
qd_iterator_free(iter);
}
qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core,
qdr_connection_t *conn,
qd_direction_t dir,
qdr_terminus_t *source,
qdr_terminus_t *target,
qdrc_endpoint_desc_t *desc,
void *link_context)
{
qdrc_endpoint_t *ep = new_qdrc_endpoint_t();
ep->desc = desc;
ep->link_context = link_context;
ep->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target,
QD_SSN_CORE_ENDPOINT);
ep->link->core_endpoint = ep;
return ep;
}
qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *ep)
{
return !!ep ? (!!ep->link ? ep->link->link_direction : QD_INCOMING) : QD_INCOMING;
}
qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *ep)
{
return !!ep ? (!!ep->link ? ep->link->conn : 0) : 0;
}
void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target)
{
qdr_link_outbound_second_attach_CT(core, ep->link, source, target);
}
void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
{
qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
if (ep->link->detach_count == 2) {
qdrc_endpoint_do_cleanup_CT(core, ep);
}
}
void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
{
qdr_link_issue_credit_CT(core, ep->link, credit, drain);
}
void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv, bool presettled)
{
uint64_t *tag = (uint64_t*) dlv->tag;
set_safe_ptr_qdr_link_t(ep->link, &dlv->link_sp);
dlv->settled = presettled;
dlv->presettled = presettled;
*tag = core->next_tag++;
dlv->tag_length = 8;
dlv->ingress_index = -1;
qdr_forward_deliver_CT(core, ep->link, dlv);
}
qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qd_message_t *message)
{
qdr_delivery_t *dlv = new_qdr_delivery_t();
uint64_t *tag = (uint64_t*) dlv->tag;
if (endpoint->link->conn)
endpoint->link->conn->last_delivery_time = core->uptime_ticks;
ZERO(dlv);
set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp);
dlv->msg = message;
*tag = core->next_tag++;
dlv->tag_length = 8;
dlv->ingress_index = -1;
dlv->delivery_id = next_delivery_id();
dlv->link_id = endpoint->link->identity;
dlv->conn_id = endpoint->link->conn_id;
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdrc_endpoint_delivery_CT", DLV_ARGS(dlv));
return dlv;
}
void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t disposition)
{
//
// Set the new delivery state
//
dlv->disposition = disposition;
dlv->settled = true;
//
// Activate the connection to push this update out
//
qdr_delivery_push_CT(core, dlv);
//
// Remove the endpoint's reference
//
qdr_delivery_decref_CT(core, dlv, "qdrc_endpoint_settle_CT - no longer held by endpoint");
}
void qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target)
{
qdrc_endpoint_t *ep = new_qdrc_endpoint_t();
ZERO(ep);
ep->desc = addr->core_endpoint;
ep->link = link;
link->core_endpoint = ep;
link->owning_addr = addr;
ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, source, target);
}
void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_terminus_t *source, qdr_terminus_t *target)
{
if (!!ep->desc->on_second_attach)
ep->desc->on_second_attach(ep->link_context, source, target);
}
void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv)
{
if (!!ep->desc->on_transfer)
ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
}
void qdrc_endpoint_do_update_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv, bool settled)
{
if (!!ep->desc->on_update)
ep->desc->on_update(ep->link_context, dlv, settled, dlv->remote_disposition);
}
void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
{
if (!!ep->desc->on_flow)
ep->desc->on_flow(ep->link_context, credit, drain);
}
void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error, qd_detach_type_t dt)
{
if (dt == QD_LOST) {
qdrc_endpoint_do_cleanup_CT(core, ep);
qdr_error_free(error);
} else if (ep->link->detach_count == 1) {
if (!!ep->desc->on_first_detach)
ep->desc->on_first_detach(ep->link_context, error);
else {
qdr_link_outbound_detach_CT(core, ep->link, 0, QDR_CONDITION_NONE, true);
qdr_error_free(error);
}
} else {
if (!!ep->desc->on_second_detach)
ep->desc->on_second_detach(ep->link_context, error);
else
qdr_error_free(error);
qdrc_endpoint_do_cleanup_CT(core, ep);
}
}
void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *ep)
{
if (!!ep->desc->on_cleanup)
ep->desc->on_cleanup(ep->link_context);
ep->link->core_endpoint = 0;
free_qdrc_endpoint_t(ep);
}