blob: 68e2afa3eb3d78c8ba49866479de72e86d5576bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <stdio.h>
#include <string.h>
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/message.h>
#include <proton/engine.h>
#include <proton/message.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>
static char *module="CONTAINER";
struct dx_node_t {
const dx_node_type_t *ntype;
char *name;
void *context;
dx_dist_mode_t supported_dist;
dx_lifetime_policy_t life_policy;
};
ALLOC_DECLARE(dx_node_t);
ALLOC_DEFINE(dx_node_t);
ALLOC_DEFINE(dx_link_item_t);
struct dx_link_t {
pn_link_t *pn_link;
void *context;
dx_node_t *node;
};
ALLOC_DECLARE(dx_link_t);
ALLOC_DEFINE(dx_link_t);
typedef struct nxc_node_type_t {
DEQ_LINKS(struct nxc_node_type_t);
const dx_node_type_t *ntype;
} nxc_node_type_t;
DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);
static hash_t *node_type_map;
static hash_t *node_map;
static sys_mutex_t *lock;
static dx_node_t *default_node;
static nxc_node_type_list_t node_type_list;
static void setup_outgoing_link(pn_link_t *pn_link)
{
sys_mutex_lock(lock);
dx_node_t *node;
int result;
const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
dx_field_iterator_t *iter;
// TODO - Extract the name from the structured source
if (source) {
iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
result = hash_retrieve(node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
} else
result = -1;
sys_mutex_unlock(lock);
if (result < 0) {
if (default_node)
node = default_node;
else {
// Reject the link
// TODO - When the API allows, add an error message for "no available node"
pn_link_close(pn_link);
return;
}
}
dx_link_t *link = new_dx_link_t();
if (!link) {
pn_link_close(pn_link);
return;
}
link->pn_link = pn_link;
link->context = 0;
link->node = node;
pn_link_set_context(pn_link, link);
node->ntype->outgoing_handler(node->context, link);
}
static void setup_incoming_link(pn_link_t *pn_link)
{
sys_mutex_lock(lock);
dx_node_t *node;
int result;
const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
dx_field_iterator_t *iter;
// TODO - Extract the name from the structured target
if (target) {
iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
result = hash_retrieve(node_map, iter, (void*) &node);
dx_field_iterator_free(iter);
} else
result = -1;
sys_mutex_unlock(lock);
if (result < 0) {
if (default_node)
node = default_node;
else {
// Reject the link
// TODO - When the API allows, add an error message for "no available node"
pn_link_close(pn_link);
return;
}
}
dx_link_t *link = new_dx_link_t();
if (!link) {
pn_link_close(pn_link);
return;
}
link->pn_link = pn_link;
link->context = 0;
link->node = node;
pn_link_set_context(pn_link, link);
node->ntype->incoming_handler(node->context, link);
}
static int do_writable(pn_link_t *pn_link)
{
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
if (!link)
return 0;
dx_node_t *node = link->node;
if (!node)
return 0;
return node->ntype->writable_handler(node->context, link);
}
static void process_receive(pn_delivery_t *delivery)
{
pn_link_t *pn_link = pn_delivery_link(delivery);
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
if (link) {
dx_node_t *node = link->node;
if (node) {
node->ntype->rx_handler(node->context, link, delivery);
return;
}
}
//
// Reject the delivery if we couldn't find a node to handle it
//
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
pn_delivery_update(delivery, PN_REJECTED);
pn_delivery_settle(delivery);
}
static void do_send(pn_delivery_t *delivery)
{
pn_link_t *pn_link = pn_delivery_link(delivery);
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
if (link) {
dx_node_t *node = link->node;
if (node) {
node->ntype->tx_handler(node->context, link, delivery);
return;
}
}
// TODO - Cancel the delivery
}
static void do_updated(pn_delivery_t *delivery)
{
pn_link_t *pn_link = pn_delivery_link(delivery);
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
if (link) {
dx_node_t *node = link->node;
if (node)
node->ntype->disp_handler(node->context, link, delivery);
}
}
static int close_handler(void* unused, pn_connection_t *conn)
{
//
// Close all links, passing False as the 'closed' argument. These links are not
// being properly 'detached'. They are being orphaned.
//
pn_link_t *pn_link = pn_link_head(conn, 0);
while (pn_link) {
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
dx_node_t *node = link->node;
if (node)
node->ntype->link_detach_handler(node->context, link, 0);
pn_link_close(pn_link);
free_dx_link_t(link);
pn_link = pn_link_next(pn_link, 0);
}
// teardown all sessions
pn_session_t *ssn = pn_session_head(conn, 0);
while (ssn) {
pn_session_close(ssn);
ssn = pn_session_next(ssn, 0);
}
// teardown the connection
pn_connection_close(conn);
return 0;
}
static int process_handler(void* unused, pn_connection_t *conn)
{
pn_session_t *ssn;
pn_link_t *pn_link;
pn_delivery_t *delivery;
int event_count = 0;
// Step 1: setup the engine's connection, and any sessions and links
// that may be pending.
// initialize the connection if it's new
if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
pn_connection_open(conn);
event_count++;
}
// open all pending sessions
ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
while (ssn) {
pn_session_open(ssn);
ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
event_count++;
}
// configure and open any pending links
pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
while (pn_link) {
if (pn_link_is_sender(pn_link))
setup_outgoing_link(pn_link);
else
setup_incoming_link(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
event_count++;
}
// Step 2: Now drain all the pending deliveries from the connection's
// work queue and process them
delivery = pn_work_head(conn);
while (delivery) {
if (pn_delivery_readable(delivery))
process_receive(delivery);
else if (pn_delivery_writable(delivery))
do_send(delivery);
if (pn_delivery_updated(delivery))
do_updated(delivery);
delivery = pn_work_next(delivery);
event_count++;
}
//
// Step 2.5: Traverse all of the links on the connection looking for
// outgoing links with non-zero credit. Call the attached node's
// writable handler for such links.
//
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (pn_link) {
assert(pn_session_connection(pn_link_session(pn_link)) == conn);
if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
event_count += do_writable(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
// Step 3: Clean up any links or sessions that have been closed by the
// remote. If the connection has been closed remotely, clean that up
// also.
// teardown any terminating links
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (pn_link) {
dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
dx_node_t *node = link->node;
if (node)
node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
pn_link_close(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
event_count++;
}
// teardown any terminating sessions
ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (ssn) {
pn_session_close(ssn);
ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
event_count++;
}
// teardown the connection if it's terminating
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
pn_connection_close(conn);
event_count++;
}
return event_count;
}
static void open_handler(dx_connection_t *conn, dx_direction_t dir)
{
const dx_node_type_t *nt;
//
// Note the locking structure in this function. Generally this would be unsafe, but since
// this particular list is only ever appended to and never has items inserted or deleted,
// this usage is safe in this case.
//
sys_mutex_lock(lock);
nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
sys_mutex_unlock(lock);
pn_connection_open(dx_connection_pn(conn));
while (nt_item) {
nt = nt_item->ntype;
if (dir == DX_INCOMING) {
if (nt->inbound_conn_open_handler)
nt->inbound_conn_open_handler(nt->type_context, conn);
} else {
if (nt->outbound_conn_open_handler)
nt->outbound_conn_open_handler(nt->type_context, conn);
}
sys_mutex_lock(lock);
nt_item = DEQ_NEXT(nt_item);
sys_mutex_unlock(lock);
}
}
static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn)
{
pn_connection_t *conn = dx_connection_pn(dx_conn);
switch (event) {
case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break;
case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break;
case DX_CONN_EVENT_CLOSE: return close_handler(context, conn);
case DX_CONN_EVENT_PROCESS: return process_handler(context, conn);
}
return 0;
}
void dx_container_initialize(void)
{
dx_log(module, LOG_TRACE, "Container Initializing");
node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
lock = sys_mutex();
default_node = 0;
DEQ_INIT(node_type_list);
dx_server_set_conn_handler(handler);
}
void dx_container_finalize(void)
{
}
int dx_container_register_node_type(const dx_node_type_t *nt)
{
int result;
dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
nxc_node_type_t *nt_item = NEW(nxc_node_type_t);
DEQ_ITEM_INIT(nt_item);
nt_item->ntype = nt;
sys_mutex_lock(lock);
result = hash_insert_const(node_type_map, iter, nt);
DEQ_INSERT_TAIL(node_type_list, nt_item);
sys_mutex_unlock(lock);
dx_field_iterator_free(iter);
if (result < 0)
return result;
dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name);
return 0;
}
void dx_container_set_default_node_type(const dx_node_type_t *nt,
void *context,
dx_dist_mode_t supported_dist)
{
if (default_node)
dx_container_destroy_node(default_node);
if (nt) {
default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
} else {
default_node = 0;
dx_log(module, LOG_TRACE, "Default node removed");
}
}
dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
const char *name,
void *context,
dx_dist_mode_t supported_dist,
dx_lifetime_policy_t life_policy)
{
int result;
dx_node_t *node = new_dx_node_t();
if (!node)
return 0;
node->ntype = nt;
node->name = 0;
node->context = context;
node->supported_dist = supported_dist;
node->life_policy = life_policy;
if (name) {
dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
sys_mutex_lock(lock);
result = hash_insert(node_map, iter, node);
sys_mutex_unlock(lock);
dx_field_iterator_free(iter);
if (result < 0) {
free_dx_node_t(node);
return 0;
}
node->name = (char*) malloc(strlen(name) + 1);
strcpy(node->name, name);
}
if (name)
dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name);
return node;
}
void dx_container_destroy_node(dx_node_t *node)
{
if (node->name) {
dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
sys_mutex_lock(lock);
hash_remove(node_map, iter);
sys_mutex_unlock(lock);
dx_field_iterator_free(iter);
free(node->name);
}
free_dx_node_t(node);
}
void dx_container_node_set_context(dx_node_t *node, void *node_context)
{
node->context = node_context;
}
dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node)
{
return node->supported_dist;
}
dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node)
{
return node->life_policy;
}
dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name)
{
pn_session_t *sess = pn_session(dx_connection_pn(conn));
dx_link_t *link = new_dx_link_t();
if (dir == DX_OUTGOING)
link->pn_link = pn_sender(sess, name);
else
link->pn_link = pn_receiver(sess, name);
link->context = node->context;
link->node = node;
pn_link_set_context(link->pn_link, link);
pn_session_open(sess);
return link;
}
void dx_link_set_context(dx_link_t *link, void *context)
{
link->context = context;
}
void *dx_link_get_context(dx_link_t *link)
{
return link->context;
}
pn_link_t *dx_link_pn(dx_link_t *link)
{
return link->pn_link;
}
pn_terminus_t *dx_link_source(dx_link_t *link)
{
return pn_link_source(link->pn_link);
}
pn_terminus_t *dx_link_target(dx_link_t *link)
{
return pn_link_target(link->pn_link);
}
pn_terminus_t *dx_link_remote_source(dx_link_t *link)
{
return pn_link_remote_source(link->pn_link);
}
pn_terminus_t *dx_link_remote_target(dx_link_t *link)
{
return pn_link_remote_target(link->pn_link);
}
void dx_link_activate(dx_link_t *link)
{
if (!link || !link->pn_link)
return;
pn_session_t *sess = pn_link_session(link->pn_link);
if (!sess)
return;
pn_connection_t *conn = pn_session_connection(sess);
if (!conn)
return;
dx_connection_t *ctx = pn_connection_get_context(conn);
if (!ctx)
return;
dx_server_activate(ctx);
}
void dx_link_close(dx_link_t *link)
{
pn_link_close(link->pn_link);
}