/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <stdio.h>
#include <string.h>
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/message.h>
#include <proton/engine.h>
#include <proton/message.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/log.h>

static char *module="CONTAINER";

struct dx_node_t {
    const dx_node_type_t *ntype;
    char                 *name;
    void                 *context;
    dx_dist_mode_t        supported_dist;
    dx_lifetime_policy_t  life_policy;
};

ALLOC_DECLARE(dx_node_t);
ALLOC_DEFINE(dx_node_t);
ALLOC_DEFINE(dx_link_item_t);

struct dx_link_t {
    pn_link_t *pn_link;
    void      *context;
    dx_node_t *node;
};

ALLOC_DECLARE(dx_link_t);
ALLOC_DEFINE(dx_link_t);

typedef struct nxc_node_type_t {
    DEQ_LINKS(struct nxc_node_type_t);
    const dx_node_type_t *ntype;
} nxc_node_type_t;
DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t);


static hash_t               *node_type_map;
static hash_t               *node_map;
static sys_mutex_t          *lock;
static dx_node_t            *default_node;
static nxc_node_type_list_t  node_type_list;

static void setup_outgoing_link(pn_link_t *pn_link)
{
    sys_mutex_lock(lock);
    dx_node_t  *node;
    int         result;
    const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link));
    dx_field_iterator_t *iter;
    // TODO - Extract the name from the structured source

    if (source) {
        iter   = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
        result = hash_retrieve(node_map, iter, (void*) &node);
        dx_field_iterator_free(iter);
    } else
        result = -1;
    sys_mutex_unlock(lock);

    if (result < 0) {
        if (default_node)
            node = default_node;
        else {
            // Reject the link
            // TODO - When the API allows, add an error message for "no available node"
            pn_link_close(pn_link);
            return;
        }
    }

    dx_link_t *link = new_dx_link_t();
    if (!link) {
        pn_link_close(pn_link);
        return;
    }

    link->pn_link = pn_link;
    link->context = 0;
    link->node    = node;

    pn_link_set_context(pn_link, link);
    node->ntype->outgoing_handler(node->context, link);
}


static void setup_incoming_link(pn_link_t *pn_link)
{
    sys_mutex_lock(lock);
    dx_node_t   *node;
    int          result;
    const char  *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
    dx_field_iterator_t *iter;
    // TODO - Extract the name from the structured target

    if (target) {
        iter   = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
        result = hash_retrieve(node_map, iter, (void*) &node);
        dx_field_iterator_free(iter);
    } else
        result = -1;
    sys_mutex_unlock(lock);

    if (result < 0) {
        if (default_node)
            node = default_node;
        else {
            // Reject the link
            // TODO - When the API allows, add an error message for "no available node"
            pn_link_close(pn_link);
            return;
        }
    }

    dx_link_t *link = new_dx_link_t();
    if (!link) {
        pn_link_close(pn_link);
        return;
    }

    link->pn_link = pn_link;
    link->context = 0;
    link->node    = node;

    pn_link_set_context(pn_link, link);
    node->ntype->incoming_handler(node->context, link);
}


static int do_writable(pn_link_t *pn_link)
{
    dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
    if (!link)
        return 0;

    dx_node_t *node = link->node;
    if (!node)
        return 0;

    return node->ntype->writable_handler(node->context, link);
}


static void process_receive(pn_delivery_t *delivery)
{
    pn_link_t *pn_link = pn_delivery_link(delivery);
    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);

    if (link) {
        dx_node_t *node = link->node;
        if (node) {
            node->ntype->rx_handler(node->context, link, delivery);
            return;
        }
    }

    //
    // Reject the delivery if we couldn't find a node to handle it
    //
    pn_link_advance(pn_link);
    pn_link_flow(pn_link, 1);
    pn_delivery_update(delivery, PN_REJECTED);
    pn_delivery_settle(delivery);
}


static void do_send(pn_delivery_t *delivery)
{
    pn_link_t *pn_link = pn_delivery_link(delivery);
    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);

    if (link) {
        dx_node_t *node = link->node;
        if (node) {
            node->ntype->tx_handler(node->context, link, delivery);
            return;
        }
    }

    // TODO - Cancel the delivery
}


static void do_updated(pn_delivery_t *delivery)
{
    pn_link_t *pn_link = pn_delivery_link(delivery);
    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);

    if (link) {
        dx_node_t *node = link->node;
        if (node)
            node->ntype->disp_handler(node->context, link, delivery);
    }
}


static int close_handler(void* unused, pn_connection_t *conn)
{
    //
    // Close all links, passing False as the 'closed' argument.  These links are not
    // being properly 'detached'.  They are being orphaned.
    //
    pn_link_t *pn_link = pn_link_head(conn, 0);
    while (pn_link) {
        dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
        dx_node_t *node = link->node;
        if (node)
            node->ntype->link_detach_handler(node->context, link, 0);
        pn_link_close(pn_link);
        free_dx_link_t(link);
        pn_link = pn_link_next(pn_link, 0);
    }

    // teardown all sessions
    pn_session_t *ssn = pn_session_head(conn, 0);
    while (ssn) {
        pn_session_close(ssn);
        ssn = pn_session_next(ssn, 0);
    }

    // teardown the connection
    pn_connection_close(conn);
    return 0;
}


static int process_handler(void* unused, pn_connection_t *conn)
{
    pn_session_t    *ssn;
    pn_link_t       *pn_link;
    pn_delivery_t   *delivery;
    int              event_count = 0;

    // Step 1: setup the engine's connection, and any sessions and links
    // that may be pending.

    // initialize the connection if it's new
    if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
        pn_connection_open(conn);
        event_count++;
    }

    // open all pending sessions
    ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
    while (ssn) {
        pn_session_open(ssn);
        ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
        event_count++;
    }

    // configure and open any pending links
    pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
    while (pn_link) {
        if (pn_link_is_sender(pn_link))
            setup_outgoing_link(pn_link);
        else
            setup_incoming_link(pn_link);
        pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
        event_count++;
    }


    // Step 2: Now drain all the pending deliveries from the connection's
    // work queue and process them

    delivery = pn_work_head(conn);
    while (delivery) {
        if      (pn_delivery_readable(delivery))
            process_receive(delivery);
        else if (pn_delivery_writable(delivery))
            do_send(delivery);

        if (pn_delivery_updated(delivery)) 
            do_updated(delivery);

        delivery = pn_work_next(delivery);
        event_count++;
    }

    //
    // Step 2.5: Traverse all of the links on the connection looking for
    // outgoing links with non-zero credit.  Call the attached node's
    // writable handler for such links.
    //
    pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
    while (pn_link) {
        assert(pn_session_connection(pn_link_session(pn_link)) == conn);
        if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
            event_count += do_writable(pn_link);
        pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
    }

    // Step 3: Clean up any links or sessions that have been closed by the
    // remote.  If the connection has been closed remotely, clean that up
    // also.

    // teardown any terminating links
    pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
    while (pn_link) {
        dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
        dx_node_t *node = link->node;
        if (node)
            node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
        pn_link_close(pn_link);
        pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
        event_count++;
    }

    // teardown any terminating sessions
    ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
    while (ssn) {
        pn_session_close(ssn);
        ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
        event_count++;
    }

    // teardown the connection if it's terminating
    if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
        pn_connection_close(conn);
        event_count++;
    }

    return event_count;
}


static void open_handler(dx_connection_t *conn, dx_direction_t dir)
{
    const dx_node_type_t *nt;

    //
    // Note the locking structure in this function.  Generally this would be unsafe, but since
    // this particular list is only ever appended to and never has items inserted or deleted,
    // this usage is safe in this case.
    //
    sys_mutex_lock(lock);
    nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list);
    sys_mutex_unlock(lock);

    pn_connection_open(dx_connection_pn(conn));

    while (nt_item) {
        nt = nt_item->ntype;
        if (dir == DX_INCOMING) {
            if (nt->inbound_conn_open_handler)
                nt->inbound_conn_open_handler(nt->type_context, conn);
        } else {
            if (nt->outbound_conn_open_handler)
                nt->outbound_conn_open_handler(nt->type_context, conn);
        }

        sys_mutex_lock(lock);
        nt_item = DEQ_NEXT(nt_item);
        sys_mutex_unlock(lock);
    }
}


static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn)
{
    pn_connection_t *conn = dx_connection_pn(dx_conn);

    switch (event) {
    case DX_CONN_EVENT_LISTENER_OPEN:  open_handler(dx_conn, DX_INCOMING);   break;
    case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING);   break;
    case DX_CONN_EVENT_CLOSE:          return close_handler(context, conn);
    case DX_CONN_EVENT_PROCESS:        return process_handler(context, conn);
    }

    return 0;
}


void dx_container_initialize(void)
{
    dx_log(module, LOG_TRACE, "Container Initializing");

    node_type_map = hash(6,  4, 1);  // 64 buckets, item batches of 4
    node_map      = hash(10, 32, 0); // 1K buckets, item batches of 32
    lock          = sys_mutex();
    default_node  = 0;
    DEQ_INIT(node_type_list);

    dx_server_set_conn_handler(handler);
}


void dx_container_finalize(void)
{
}


int dx_container_register_node_type(const dx_node_type_t *nt)
{
    int result;
    dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL);
    nxc_node_type_t     *nt_item = NEW(nxc_node_type_t);
    DEQ_ITEM_INIT(nt_item);
    nt_item->ntype = nt;

    sys_mutex_lock(lock);
    result = hash_insert_const(node_type_map, iter, nt);
    DEQ_INSERT_TAIL(node_type_list, nt_item);
    sys_mutex_unlock(lock);

    dx_field_iterator_free(iter);
    if (result < 0)
        return result;
    dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name);

    return 0;
}


void dx_container_set_default_node_type(const dx_node_type_t *nt,
                                        void                 *context,
                                        dx_dist_mode_t        supported_dist)
{
    if (default_node)
        dx_container_destroy_node(default_node);

    if (nt) {
        default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT);
        dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name);
    } else {
        default_node = 0;
        dx_log(module, LOG_TRACE, "Default node removed");
    }
}


dx_node_t *dx_container_create_node(const dx_node_type_t *nt,
                                    const char           *name,
                                    void                 *context,
                                    dx_dist_mode_t        supported_dist,
                                    dx_lifetime_policy_t  life_policy)
{
    int result;
    dx_node_t *node = new_dx_node_t();
    if (!node)
        return 0;

    node->ntype          = nt;
    node->name           = 0;
    node->context        = context;
    node->supported_dist = supported_dist;
    node->life_policy    = life_policy;

    if (name) {
        dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
        sys_mutex_lock(lock);
        result = hash_insert(node_map, iter, node);
        sys_mutex_unlock(lock);
        dx_field_iterator_free(iter);
        if (result < 0) {
            free_dx_node_t(node);
            return 0;
        }

        node->name = (char*) malloc(strlen(name) + 1);
        strcpy(node->name, name);
    }

    if (name)
        dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name);

    return node;
}


void dx_container_destroy_node(dx_node_t *node)
{
    if (node->name) {
        dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
        sys_mutex_lock(lock);
        hash_remove(node_map, iter);
        sys_mutex_unlock(lock);
        dx_field_iterator_free(iter);
        free(node->name);
    }

    free_dx_node_t(node);
}


void dx_container_node_set_context(dx_node_t *node, void *node_context)
{
    node->context = node_context;
}


dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node)
{
    return node->supported_dist;
}


dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node)
{
    return node->life_policy;
}


dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name)
{
    pn_session_t *sess = pn_session(dx_connection_pn(conn));
    dx_link_t    *link = new_dx_link_t();

    if (dir == DX_OUTGOING)
        link->pn_link = pn_sender(sess, name);
    else
        link->pn_link = pn_receiver(sess, name);
    link->context = node->context;
    link->node    = node;

    pn_link_set_context(link->pn_link, link);

    pn_session_open(sess);

    return link;
}


void dx_link_set_context(dx_link_t *link, void *context)
{
    link->context = context;
}


void *dx_link_get_context(dx_link_t *link)
{
    return link->context;
}


pn_link_t *dx_link_pn(dx_link_t *link)
{
    return link->pn_link;
}


pn_terminus_t *dx_link_source(dx_link_t *link)
{
    return pn_link_source(link->pn_link);
}


pn_terminus_t *dx_link_target(dx_link_t *link)
{
    return pn_link_target(link->pn_link);
}


pn_terminus_t *dx_link_remote_source(dx_link_t *link)
{
    return pn_link_remote_source(link->pn_link);
}


pn_terminus_t *dx_link_remote_target(dx_link_t *link)
{
    return pn_link_remote_target(link->pn_link);
}


void dx_link_activate(dx_link_t *link)
{
    if (!link || !link->pn_link)
        return;

    pn_session_t *sess = pn_link_session(link->pn_link);
    if (!sess)
        return;

    pn_connection_t *conn = pn_session_connection(sess);
    if (!conn)
        return;

    dx_connection_t *ctx = pn_connection_get_context(conn);
    if (!ctx)
        return;

    dx_server_activate(ctx);
}


void dx_link_close(dx_link_t *link)
{
    pn_link_close(link->pn_link);
}


