/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/log.h>
#include "server_private.h"
#include "timer_private.h"
#include "alloc_private.h"
#include "auth.h"
#include "work_queue.h"
#include <stdio.h>
#include <time.h>
#include <signal.h>

static char *module="SERVER";

typedef struct dx_thread_t {
    int           thread_id;
    volatile int  running;
    volatile int  canceled;
    int           using_thread;
    sys_thread_t *thread;
} dx_thread_t;


typedef struct dx_server_t {
    int                      thread_count;
    pn_driver_t             *driver;
    dx_thread_start_cb_t     start_handler;
    dx_conn_handler_cb_t     conn_handler;
    dx_signal_handler_cb_t   signal_handler;
    dx_user_fd_handler_cb_t  ufd_handler;
    void                    *start_context;
    void                    *conn_context;
    void                    *signal_context;
    sys_cond_t              *cond;
    sys_mutex_t             *lock;
    dx_thread_t            **threads;
    work_queue_t            *work_queue;
    dx_timer_list_t          pending_timers;
    bool                     a_thread_is_waiting;
    int                      threads_active;
    int                      pause_requests;
    int                      threads_paused;
    int                      pause_next_sequence;
    int                      pause_now_serving;
    int                      pending_signal;
} dx_server_t;


ALLOC_DEFINE(dx_listener_t);
ALLOC_DEFINE(dx_connector_t);
ALLOC_DEFINE(dx_connection_t);
ALLOC_DEFINE(dx_user_fd_t);


/**
 * Singleton Concurrent Proton Driver object
 */
static dx_server_t *dx_server = 0;


static void signal_handler(int signum)
{
    dx_server->pending_signal = signum;
    sys_cond_signal_all(dx_server->cond);
}


static dx_thread_t *thread(int id)
{
    dx_thread_t *thread = NEW(dx_thread_t);
    if (!thread)
        return 0;

    thread->thread_id    = id;
    thread->running      = 0;
    thread->canceled     = 0;
    thread->using_thread = 0;

    return thread;
}


static void thread_process_listeners(pn_driver_t *driver)
{
    pn_listener_t   *listener = pn_driver_listener(driver);
    pn_connector_t  *cxtr;
    dx_connection_t *ctx;

    while (listener) {
        dx_log(module, LOG_TRACE, "Accepting Connection");
        cxtr = pn_listener_accept(listener);
        ctx = new_dx_connection_t();
        ctx->state        = CONN_STATE_SASL_SERVER;
        ctx->owner_thread = CONTEXT_NO_OWNER;
        ctx->enqueued     = 0;
        ctx->pn_cxtr      = cxtr;
        ctx->pn_conn      = 0;
        ctx->listener     = (dx_listener_t*) pn_listener_context(listener);
        ctx->connector    = 0;
        ctx->context      = ctx->listener->context;
        ctx->ufd          = 0;

        pn_connector_set_context(cxtr, ctx);
        listener = pn_driver_listener(driver);
    }
}


static void handle_signals_LH(void)
{
    int signum = dx_server->pending_signal;

    if (signum) {
        dx_server->pending_signal = 0;
        if (dx_server->signal_handler) {
            sys_mutex_unlock(dx_server->lock);
            dx_server->signal_handler(dx_server->signal_context, signum);
            sys_mutex_lock(dx_server->lock);
        }
    }
}


static void block_if_paused_LH(void)
{
    if (dx_server->pause_requests > 0) {
        dx_server->threads_paused++;
        sys_cond_signal_all(dx_server->cond);
        while (dx_server->pause_requests > 0)
            sys_cond_wait(dx_server->cond, dx_server->lock);
        dx_server->threads_paused--;
    }
}


static void process_connector(pn_connector_t *cxtr)
{
    dx_connection_t *ctx = pn_connector_context(cxtr);
    int events      = 0;
    int auth_passes = 0;

    if (ctx->state == CONN_STATE_USER) {
        dx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
        return;
    }

    do {
        //
        // Step the engine for pre-handler processing
        //
        pn_connector_process(cxtr);

        //
        // Call the handler that is appropriate for the connector's state.
        //
        switch (ctx->state) {
        case CONN_STATE_CONNECTING:
            if (!pn_connector_closed(cxtr)) {
                ctx->state = CONN_STATE_SASL_CLIENT;
                assert(ctx->connector);
                ctx->connector->state = CXTR_STATE_OPEN;
                events = 1;
            } else {
                ctx->state = CONN_STATE_FAILED;
                events = 0;
            }
            break;

        case CONN_STATE_SASL_CLIENT:
            if (auth_passes == 0) {
                auth_client_handler(cxtr);
                events = 1;
            } else {
                auth_passes++;
                events = 0;
            }
            break;

        case CONN_STATE_SASL_SERVER:
            if (auth_passes == 0) {
                auth_server_handler(cxtr);
                events = 1;
            } else {
                auth_passes++;
                events = 0;
            }
            break;

        case CONN_STATE_OPENING:
            ctx->state = CONN_STATE_OPERATIONAL;

            pn_connection_t *conn = pn_connection();
            pn_connection_set_container(conn, "dispatch"); // TODO - make unique
            pn_connector_set_connection(cxtr, conn);
            pn_connection_set_context(conn, ctx);
            ctx->pn_conn = conn;

            dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy

            if (ctx->listener) {
                ce = DX_CONN_EVENT_LISTENER_OPEN;
            } else if (ctx->connector) {
                ce = DX_CONN_EVENT_CONNECTOR_OPEN;
                ctx->connector->delay = 0;
            } else
                assert(0);

            dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
            events = 1;
            break;

        case CONN_STATE_OPERATIONAL:
            if (pn_connector_closed(cxtr)) {
                dx_server->conn_handler(ctx->context,
                                        DX_CONN_EVENT_CLOSE,
                                        (dx_connection_t*) pn_connector_context(cxtr));
                events = 0;
            }
            else
                events = dx_server->conn_handler(ctx->context,
                                                 DX_CONN_EVENT_PROCESS,
                                                 (dx_connection_t*) pn_connector_context(cxtr));
            break;

        default:
            break;
        }
    } while (events > 0);
}


//
// TEMPORARY FUNCTION PROTOTYPES
//
void pn_driver_wait_1(pn_driver_t *d);
int  pn_driver_wait_2(pn_driver_t *d, int timeout);
void pn_driver_wait_3(pn_driver_t *d);
//
// END TEMPORARY
//

static void *thread_run(void *arg)
{
    dx_thread_t     *thread = (dx_thread_t*) arg;
    pn_connector_t  *work;
    pn_connection_t *conn;
    dx_connection_t *ctx;
    int              error;
    int              poll_result;
    int              timer_holdoff = 0;

    if (!thread)
        return 0;

    thread->running = 1;

    if (thread->canceled)
        return 0;

    //
    // Invoke the start handler if the application supplied one.
    // This handler can be used to set NUMA or processor affinnity for the thread.
    //
    if (dx_server->start_handler)
        dx_server->start_handler(dx_server->start_context, thread->thread_id);

    //
    // Main Loop
    //
    while (thread->running) {
        sys_mutex_lock(dx_server->lock);

        //
        // Check for pending signals to process
        //
        handle_signals_LH();
        if (!thread->running) {
            sys_mutex_unlock(dx_server->lock);
            break;
        }

        //
        // Check to see if the server is pausing.  If so, block here.
        //
        block_if_paused_LH();
        if (!thread->running) {
            sys_mutex_unlock(dx_server->lock);
            break;
        }

        //
        // Service pending timers.
        //
        dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
        if (timer) {
            DEQ_REMOVE_HEAD(dx_server->pending_timers);

            //
            // Mark the timer as idle in case it reschedules itself.
            //
            dx_timer_idle_LH(timer);

            //
            // Release the lock and invoke the connection handler.
            //
            sys_mutex_unlock(dx_server->lock);
            timer->handler(timer->context);
            pn_driver_wakeup(dx_server->driver);
            continue;
        }

        //
        // Check the work queue for connectors scheduled for processing.
        //
        work = work_queue_get(dx_server->work_queue);
        if (!work) {
            //
            // There is no pending work to do
            //
            if (dx_server->a_thread_is_waiting) {
                //
                // Another thread is waiting on the proton driver, this thread must
                // wait on the condition variable until signaled.
                //
                sys_cond_wait(dx_server->cond, dx_server->lock);
            } else {
                //
                // This thread elects itself to wait on the proton driver.  Set the
                // thread-is-waiting flag so other idle threads will not interfere.
                //
                dx_server->a_thread_is_waiting = true;

                //
                // Ask the timer module when its next timer is scheduled to fire.  We'll
                // use this value in driver_wait as the timeout.  If there are no scheduled
                // timers, the returned value will be -1.
                //
                long duration = dx_timer_next_duration_LH();

                //
                // Invoke the proton driver's wait sequence.  This is a bit of a hack for now
                // and will be improved in the future.  The wait process is divided into three parts,
                // the first and third of which need to be non-reentrant, and the second of which
                // must be reentrant (and blocks).
                //
                pn_driver_wait_1(dx_server->driver);
                sys_mutex_unlock(dx_server->lock);

                do {
                    error = 0;
                    poll_result = pn_driver_wait_2(dx_server->driver, duration);
                    if (poll_result == -1)
                        error = pn_driver_errno(dx_server->driver);
                } while (error == PN_INTR);
                if (error) {
                    dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver)));
                    exit(-1);
                }

                sys_mutex_lock(dx_server->lock);
                pn_driver_wait_3(dx_server->driver);

                if (!thread->running) {
                    sys_mutex_unlock(dx_server->lock);
                    break;
                }

                //
                // Visit the timer module.
                //
                if (poll_result == 0 || ++timer_holdoff == 100) {
                    struct timespec tv;
                    clock_gettime(CLOCK_REALTIME, &tv);
                    long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
                    dx_timer_visit_LH(milliseconds);
                    timer_holdoff = 0;
                }

                //
                // Process listeners (incoming connections).
                //
                thread_process_listeners(dx_server->driver);

                //
                // Traverse the list of connectors-needing-service from the proton driver.
                // If the connector is not already in the work queue and it is not currently
                // being processed by another thread, put it in the work queue and signal the
                // condition variable.
                //
                work = pn_driver_connector(dx_server->driver);
                while (work) {
                    ctx = pn_connector_context(work);
                    if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
                        ctx->enqueued = 1;
                        work_queue_put(dx_server->work_queue, work);
                        sys_cond_signal(dx_server->cond);
                    }
                    work = pn_driver_connector(dx_server->driver);
                }

                //
                // Release our exclusive claim on pn_driver_wait.
                //
                dx_server->a_thread_is_waiting = false;
            }
        }

        //
        // If we were given a connector to work on from the work queue, mark it as
        // owned by this thread and as no longer enqueued.
        //
        if (work) {
            ctx = pn_connector_context(work);
            if (ctx->owner_thread == CONTEXT_NO_OWNER) {
                ctx->owner_thread = thread->thread_id;
                ctx->enqueued = 0;
                dx_server->threads_active++;
            } else {
                //
                // This connector is being processed by another thread, re-queue it.
                //
                work_queue_put(dx_server->work_queue, work);
                work = 0;
            }
        }
        sys_mutex_unlock(dx_server->lock);

        //
        // Process the connector that we now have exclusive access to.
        //
        if (work) {
            process_connector(work);

            //
            // Check to see if the connector was closed during processing
            //
            if (pn_connector_closed(work)) {
                //
                // Connector is closed.  Free the context and the connector.
                //
                conn = pn_connector_connection(work);
                if (ctx->connector) {
                    ctx->connector->ctx = 0;
                    ctx->connector->state = CXTR_STATE_CONNECTING;
                    dx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
                }
                sys_mutex_lock(dx_server->lock);
                free_dx_connection_t(ctx);
                pn_connector_free(work);
                if (conn)
                    pn_connection_free(conn);
                dx_server->threads_active--;
                sys_mutex_unlock(dx_server->lock);
            } else {
                //
                // The connector lives on.  Mark it as no longer owned by this thread.
                //
                sys_mutex_lock(dx_server->lock);
                ctx->owner_thread = CONTEXT_NO_OWNER;
                dx_server->threads_active--;
                sys_mutex_unlock(dx_server->lock);
            }

            //
            // Wake up the proton driver to force it to reconsider its set of FDs
            // in light of the processing that just occurred.
            //
            pn_driver_wakeup(dx_server->driver);
        }
    }

    return 0;
}


static void thread_start(dx_thread_t *thread)
{
    if (!thread)
        return;

    thread->using_thread = 1;
    thread->thread = sys_thread(thread_run, (void*) thread);
}


static void thread_cancel(dx_thread_t *thread)
{
    if (!thread)
        return;

    thread->running  = 0;
    thread->canceled = 1;
}


static void thread_join(dx_thread_t *thread)
{
    if (!thread)
        return;

    if (thread->using_thread)
        sys_thread_join(thread->thread);
}


static void thread_free(dx_thread_t *thread)
{
    if (!thread)
        return;

    free(thread);
}


static void cxtr_try_open(void *context)
{
    dx_connector_t *ct = (dx_connector_t*) context;
    if (ct->state != CXTR_STATE_CONNECTING)
        return;

    dx_connection_t *ctx = new_dx_connection_t();
    ctx->state        = CONN_STATE_CONNECTING;
    ctx->owner_thread = CONTEXT_NO_OWNER;
    ctx->enqueued     = 0;
    ctx->pn_conn      = 0;
    ctx->listener     = 0;
    ctx->connector    = ct;
    ctx->context      = ct->context;
    ctx->user_context = 0;
    ctx->ufd          = 0;

    //
    // pn_connector is not thread safe
    //
    sys_mutex_lock(dx_server->lock);
    ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx);
    sys_mutex_unlock(dx_server->lock);

    ct->ctx   = ctx;
    ct->delay = 5000;
    dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
}


void dx_server_initialize(int thread_count)
{
    int i;

    if (dx_server)
        return;     // TODO - Fail in a more dramatic way

    dx_alloc_initialize();
    dx_server = NEW(dx_server_t);

    if (!dx_server)
        return;   // TODO - Fail in a more dramatic way

    dx_server->thread_count    = thread_count;
    dx_server->driver          = pn_driver();
    dx_server->start_handler   = 0;
    dx_server->conn_handler    = 0;
    dx_server->signal_handler  = 0;
    dx_server->ufd_handler     = 0;
    dx_server->start_context   = 0;
    dx_server->signal_context  = 0;
    dx_server->lock            = sys_mutex();
    dx_server->cond            = sys_cond();

    dx_timer_initialize(dx_server->lock);

    dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
    for (i = 0; i < thread_count; i++)
        dx_server->threads[i] = thread(i);

    dx_server->work_queue          = work_queue();
    DEQ_INIT(dx_server->pending_timers);
    dx_server->a_thread_is_waiting = false;
    dx_server->threads_active      = 0;
    dx_server->pause_requests      = 0;
    dx_server->threads_paused      = 0;
    dx_server->pause_next_sequence = 0;
    dx_server->pause_now_serving   = 0;
    dx_server->pending_signal      = 0;
}


void dx_server_finalize(void)
{
    int i;
    if (!dx_server)
        return;

    for (i = 0; i < dx_server->thread_count; i++)
        thread_free(dx_server->threads[i]);

    work_queue_free(dx_server->work_queue);

    pn_driver_free(dx_server->driver);
    sys_mutex_free(dx_server->lock);
    sys_cond_free(dx_server->cond);
    free(dx_server);
    dx_server = 0;
}


void dx_server_set_conn_handler(dx_conn_handler_cb_t handler)
{
    dx_server->conn_handler = handler;
}


void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context)
{
    dx_server->signal_handler = handler;
    dx_server->signal_context = context;
}


void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context)
{
    dx_server->start_handler = handler;
    dx_server->start_context = context;
}


void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler)
{
    dx_server->ufd_handler = ufd_handler;
}


void dx_server_run(void)
{
    int i;
    if (!dx_server)
        return;

    assert(dx_server->conn_handler); // Server can't run without a connection handler.

    for (i = 1; i < dx_server->thread_count; i++)
        thread_start(dx_server->threads[i]);

    dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);

    thread_run((void*) dx_server->threads[0]);

    for (i = 1; i < dx_server->thread_count; i++)
        thread_join(dx_server->threads[i]);

    dx_log(module, LOG_INFO, "Shut Down");
}


void dx_server_stop(void)
{
    int idx;

    sys_mutex_lock(dx_server->lock);
    for (idx = 0; idx < dx_server->thread_count; idx++)
        thread_cancel(dx_server->threads[idx]);
    sys_cond_signal_all(dx_server->cond);
    pn_driver_wakeup(dx_server->driver);
    sys_mutex_unlock(dx_server->lock);
}


void dx_server_signal(int signum)
{
    signal(signum, signal_handler);
}


void dx_server_pause(void)
{
    sys_mutex_lock(dx_server->lock);

    //
    // Bump the request count to stop all the threads.
    //
    dx_server->pause_requests++;
    int my_sequence = dx_server->pause_next_sequence++;

    //
    // Awaken all threads that are currently blocking.
    //
    sys_cond_signal_all(dx_server->cond);
    pn_driver_wakeup(dx_server->driver);

    //
    // Wait for the paused thread count plus the number of threads requesting a pause to equal
    // the total thread count.  Also, don't exit the blocking loop until now_serving equals our
    // sequence number.  This ensures that concurrent pausers don't run at the same time.
    //
    while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) ||
           (my_sequence != dx_server->pause_now_serving))
        sys_cond_wait(dx_server->cond, dx_server->lock);

    sys_mutex_unlock(dx_server->lock);
}


void dx_server_resume(void)
{
    sys_mutex_lock(dx_server->lock);
    dx_server->pause_requests--;
    dx_server->pause_now_serving++;
    sys_cond_signal_all(dx_server->cond);
    sys_mutex_unlock(dx_server->lock);
}


void dx_server_activate(dx_connection_t *ctx)
{
    if (!ctx)
        return;

    pn_connector_t *ctor = ctx->pn_cxtr;
    if (!ctor)
        return;

    if (!pn_connector_closed(ctor))
        pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE);
}


void dx_connection_set_context(dx_connection_t *conn, void *context)
{
    conn->user_context = context;
}


void *dx_connection_get_context(dx_connection_t *conn)
{
    return conn->user_context;
}


pn_connection_t *dx_connection_pn(dx_connection_t *conn)
{
    return conn->pn_conn;
}


dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context)
{
    dx_listener_t *li = new_dx_listener_t();

    if (!li)
        return 0;

    li->config      = config;
    li->context     = context;
    li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);

    if (!li->pn_listener) {
        dx_log(module, LOG_ERROR, "Driver Error %d (%s)",
               pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver));
        free_dx_listener_t(li);
        return 0;
    }
    dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);

    return li;
}


void dx_server_listener_free(dx_listener_t* li)
{
    pn_listener_free(li->pn_listener);
    free_dx_listener_t(li);
}


void dx_server_listener_close(dx_listener_t* li)
{
    pn_listener_close(li->pn_listener);
}


dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context)
{
    dx_connector_t *ct = new_dx_connector_t();

    if (!ct)
        return 0;

    ct->state   = CXTR_STATE_CONNECTING;
    ct->config  = config;
    ct->context = context;
    ct->ctx     = 0;
    ct->timer   = dx_timer(cxtr_try_open, (void*) ct);
    ct->delay   = 0;

    dx_timer_schedule(ct->timer, ct->delay);
    return ct;
}


void dx_server_connector_free(dx_connector_t* ct)
{
    // Don't free the proton connector.  This will be done by the connector
    // processing/cleanup.

    if (ct->ctx) {
        pn_connector_close(ct->ctx->pn_cxtr);
        ct->ctx->connector = 0;
    }

    dx_timer_free(ct->timer);
    free_dx_connector_t(ct);
}


dx_user_fd_t *dx_user_fd(int fd, void *context)
{
    dx_user_fd_t *ufd = new_dx_user_fd_t();

    if (!ufd)
        return 0;

    dx_connection_t *ctx = new_dx_connection_t();
    ctx->state        = CONN_STATE_USER;
    ctx->owner_thread = CONTEXT_NO_OWNER;
    ctx->enqueued     = 0;
    ctx->pn_conn      = 0;
    ctx->listener     = 0;
    ctx->connector    = 0;
    ctx->context      = 0;
    ctx->user_context = 0;
    ctx->ufd          = ufd;

    ufd->context = context;
    ufd->fd      = fd;
    ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
    pn_driver_wakeup(dx_server->driver);

    return ufd;
}


void dx_user_fd_free(dx_user_fd_t *ufd)
{
    pn_connector_close(ufd->pn_conn);
    free_dx_user_fd_t(ufd);
}


void dx_user_fd_activate_read(dx_user_fd_t *ufd)
{
    pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
    pn_driver_wakeup(dx_server->driver);
}


void dx_user_fd_activate_write(dx_user_fd_t *ufd)
{
    pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
    pn_driver_wakeup(dx_server->driver);
}


bool dx_user_fd_is_readable(dx_user_fd_t *ufd)
{
    return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
}


bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
{
    return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
}


void dx_server_timer_pending_LH(dx_timer_t *timer)
{
    DEQ_INSERT_TAIL(dx_server->pending_timers, timer);
}


void dx_server_timer_cancel_LH(dx_timer_t *timer)
{
    DEQ_REMOVE(dx_server->pending_timers, timer);
}

