/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "http_common.h"

#include <proton/listener.h>

#include <stdio.h>

ALLOC_DECLARE(qd_http_listener_t);
ALLOC_DEFINE(qd_http_listener_t);
ALLOC_DECLARE(qd_http_connector_t);
ALLOC_DEFINE(qd_http_connector_t);


static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t *config, qd_entity_t* entity)
{
    char *version_str = 0;
    char *aggregation_str = 0;

    qd_error_clear();
    ZERO(config);

#define CHECK() if (qd_error_code()) goto error
    config->name    = qd_entity_get_string(entity, "name");            CHECK();
    config->host    = qd_entity_get_string(entity, "host");            CHECK();
    config->port    = qd_entity_get_string(entity, "port");            CHECK();
    config->address = qd_entity_get_string(entity, "address");         CHECK();
    config->site    = qd_entity_opt_string(entity, "siteId", 0);       CHECK();
    version_str     = qd_entity_get_string(entity, "protocolVersion");  CHECK();
    aggregation_str = qd_entity_opt_string(entity, "aggregation", 0);  CHECK();
    config->event_channel = qd_entity_opt_bool(entity, "eventChannel", false); CHECK();
    config->host_override  = qd_entity_opt_string(entity, "hostOverride", 0);   CHECK();

    if (strcmp(version_str, "HTTP2") == 0) {
        config->version = VERSION_HTTP2;
    } else {
        config->version = VERSION_HTTP1;
    }
    free(version_str);
    version_str = 0;

    if (aggregation_str && strcmp(aggregation_str, "json") == 0) {
        config->aggregation = QD_AGGREGATION_JSON;
    } else if (aggregation_str && strcmp(aggregation_str, "multipart") == 0) {
        config->aggregation = QD_AGGREGATION_MULTIPART;
    } else {
        config->aggregation = QD_AGGREGATION_NONE;
    }
    free(aggregation_str);
    aggregation_str = 0;

    int hplen = strlen(config->host) + strlen(config->port) + 2;
    config->host_port = malloc(hplen);
    snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);

    return QD_ERROR_NONE;

error:
    qd_http_free_bridge_config(config);
    free(version_str);
    return qd_error_code();
}


void qd_http_free_bridge_config(qd_http_bridge_config_t *config)
{
    if (!config) {
        return;
    }
    free(config->host);
    free(config->port);
    free(config->name);
    free(config->address);
    free(config->site);
    free(config->host_override);
    free(config->host_port);
}


//
// HTTP Listener Management (HttpListenerEntity)
//


qd_http_listener_t *qd_dispatch_configure_http_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
    qd_http_listener_t *listener = 0;
    qd_http_bridge_config_t config;

    if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
        qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
               "Unable to create http listener: %s", qd_error_message());
        return 0;
    }

    switch (config.version) {
    case VERSION_HTTP1:
        listener = qd_http1_configure_listener(qd, &config, entity);
        break;
    case VERSION_HTTP2:
        listener = qd_http2_configure_listener(qd, &config, entity);
        break;
    }

    if (!listener)
        qd_http_free_bridge_config(&config);

    return listener;
}


void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl)
{
    qd_http_listener_t *listener = (qd_http_listener_t*) impl;
    if (listener) {
        switch (listener->config.version) {
        case VERSION_HTTP1:
            qd_http1_delete_listener(qd, listener);
            break;
        case VERSION_HTTP2:
            qd_http2_delete_listener(qd, listener);
            break;
        }
    }
}


qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl)
{
    return QD_ERROR_NONE;
}


//
// HTTP Connector Management (HttpConnectorEntity)
//


qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
    qd_http_connector_t *conn = 0;
    qd_http_bridge_config_t config;

    if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) {
        qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR,
               "Unable to create http connector: %s", qd_error_message());
        return 0;
    }

    switch (config.version) {
    case VERSION_HTTP1:
        conn = qd_http1_configure_connector(qd, &config, entity);
        break;
    case VERSION_HTTP2:
        conn = qd_http2_configure_connector(qd, &config, entity);
        break;
    }

    if (!conn)
        qd_http_free_bridge_config(&config);

    return conn;
}


void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl)
{
    qd_http_connector_t *conn = (qd_http_connector_t*) impl;

    if (conn) {
        switch (conn->config.version) {
        case VERSION_HTTP1:
            qd_http1_delete_connector(qd, conn);
            break;
        case VERSION_HTTP2:
            qd_http2_delete_connector(qd, conn);
            break;
        }
    }
}

qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl)
{
    return QD_ERROR_NONE;
}

//
// qd_http_listener_t constructor
//

qd_http_listener_t *qd_http_listener(qd_server_t *server, qd_server_event_handler_t handler)
{
    qd_http_listener_t *li = new_qd_http_listener_t();
    if (!li)
        return 0;
    ZERO(li);

    li->pn_listener = pn_listener();
    if (!li->pn_listener) {
        free_qd_http_listener_t(li);
        return 0;
    }

    sys_atomic_init(&li->ref_count, 1);
    li->server = server;
    li->context.context = li;
    li->context.handler = handler;
    pn_listener_set_context(li->pn_listener, &li->context);

    return li;
}

void qd_http_listener_decref(qd_http_listener_t* li)
{
    if (li && sys_atomic_dec(&li->ref_count) == 1) {
        qd_http_free_bridge_config(&li->config);
        free_qd_http_listener_t(li);
    }
}

//
// qd_http_connector_t constructor
//

qd_http_connector_t *qd_http_connector(qd_server_t *server)
{
    qd_http_connector_t *c = new_qd_http_connector_t();
    if (!c) return 0;
    ZERO(c);
    sys_atomic_init(&c->ref_count, 1);
    c->server      = server;
    return c;
}

void qd_http_connector_decref(qd_http_connector_t* c)
{
    if (c && sys_atomic_dec(&c->ref_count) == 1) {
        qd_http_free_bridge_config(&c->config);
        free_qd_http_connector_t(c);
    }
}


typedef struct qdr_http_method_status_t  qdr_http_method_status_t;

struct qdr_http_method_status_t {
    DEQ_LINKS(qdr_http_method_status_t);

    char     *key;
    uint64_t requests;
};
DEQ_DECLARE(qdr_http_method_status_t, qdr_http_method_status_list_t);

typedef struct qdr_http_request_info_t  qdr_http_request_info_t;

struct qdr_http_request_info_t {
    DEQ_LINKS(qdr_http_request_info_t);

    char     *key;
    char     *address;
    char     *host;
    char     *site;
    bool      ingress;
    uint64_t  requests;
    uint64_t  bytes_in;
    uint64_t  bytes_out;
    uint64_t  max_latency;
    qdr_http_method_status_list_t detail;
};
DEQ_DECLARE(qdr_http_request_info_t, qdr_http_request_info_list_t);

#define QDR_HTTP_REQUEST_INFO_NAME                   0
#define QDR_HTTP_REQUEST_INFO_IDENTITY               1
#define QDR_HTTP_REQUEST_INFO_ADDRESS                2
#define QDR_HTTP_REQUEST_INFO_HOST                   3
#define QDR_HTTP_REQUEST_INFO_SITE                   4
#define QDR_HTTP_REQUEST_INFO_DIRECTION              5
#define QDR_HTTP_REQUEST_INFO_REQUESTS               6
#define QDR_HTTP_REQUEST_INFO_BYTES_IN               7
#define QDR_HTTP_REQUEST_INFO_BYTES_OUT              8
#define QDR_HTTP_REQUEST_INFO_MAX_LATENCY            9
#define QDR_HTTP_REQUEST_INFO_DETAIL                10


const char * const QDR_HTTP_REQUEST_INFO_DIRECTION_IN  = "in";
const char * const QDR_HTTP_REQUEST_INFO_DIRECTION_OUT = "out";

const char *qdr_http_request_info_columns[] =
    {"name",
     "identity",
     "address",
     "host",
     "site",
     "direction",
     "requests",
     "bytesIn",
     "bytesOut",
     "maxLatency",
     "details",
     0};

const char *HTTP_REQUEST_INFO_TYPE = "org.apache.qpid.dispatch.httpRequestInfo";

typedef struct {
    qdr_http_request_info_list_t records;
} http_request_info_records_t;

static http_request_info_records_t* request_info = 0;

static http_request_info_records_t *_get_request_info()
{
    if (!request_info) {
        request_info = NEW(http_request_info_records_t);
        DEQ_INIT(request_info->records);
    }
    return request_info;
}

static void insert_column(qdr_core_t *core, qdr_http_request_info_t *record, int col, qd_composed_field_t *body)
{
    qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Insert column %i for %p", col, (void*) record);

    if (!record)
        return;

    switch(col) {
    case QDR_HTTP_REQUEST_INFO_NAME:
        qd_compose_insert_string(body, record->key);
        break;

    case QDR_HTTP_REQUEST_INFO_IDENTITY: {
        qd_compose_insert_string(body, record->key);
        break;
    }

    case QDR_HTTP_REQUEST_INFO_ADDRESS:
        qd_compose_insert_string(body, record->address);
        break;

    case QDR_HTTP_REQUEST_INFO_HOST:
        qd_compose_insert_string(body, record->host);
        break;

    case QDR_HTTP_REQUEST_INFO_SITE:
        qd_compose_insert_string(body, record->site);
        break;

    case QDR_HTTP_REQUEST_INFO_DIRECTION:
        if (record->ingress)
            qd_compose_insert_string(body, QDR_HTTP_REQUEST_INFO_DIRECTION_IN);
        else
            qd_compose_insert_string(body, QDR_HTTP_REQUEST_INFO_DIRECTION_OUT);
        break;

    case QDR_HTTP_REQUEST_INFO_REQUESTS:
        qd_compose_insert_uint(body, record->requests);
        break;

    case QDR_HTTP_REQUEST_INFO_BYTES_IN:
        qd_compose_insert_uint(body, record->bytes_in);
        break;

    case QDR_HTTP_REQUEST_INFO_BYTES_OUT:
        qd_compose_insert_uint(body, record->bytes_out);
        break;

    case QDR_HTTP_REQUEST_INFO_MAX_LATENCY:
        qd_compose_insert_uint(body, record->max_latency);
        break;

    case QDR_HTTP_REQUEST_INFO_DETAIL:
        qd_compose_start_map(body);
        for (qdr_http_method_status_t *item = DEQ_HEAD(record->detail); item; item = DEQ_NEXT(item)) {
            qd_compose_insert_string(body, item->key);
            qd_compose_insert_int(body, item->requests);
        }
        qd_compose_end_map(body);
        break;

    }
}


static void write_list(qdr_core_t *core, qdr_query_t *query,  qdr_http_request_info_t *record)
{
    qd_composed_field_t *body = query->body;

    qd_compose_start_list(body);

    if (record) {
        int i = 0;
        while (query->columns[i] >= 0) {
            insert_column(core, record, query->columns[i], body);
            i++;
        }
    }
    qd_compose_end_list(body);
}

static void write_map(qdr_core_t           *core,
                      qdr_http_request_info_t *record,
                      qd_composed_field_t  *body,
                      const char           *qdr_connection_columns[])
{
    qd_compose_start_map(body);

    for(int i = 0; i < QDR_HTTP_REQUEST_INFO_COLUMN_COUNT; i++) {
        qd_compose_insert_string(body, qdr_connection_columns[i]);
        insert_column(core, record, i, body);
    }

    qd_compose_end_map(body);
}

static void advance(qdr_query_t *query, qdr_http_request_info_t *record)
{
    if (record) {
        query->next_offset++;
        record = DEQ_NEXT(record);
        query->more = !!record;
    }
    else {
        query->more = false;
    }
}

static qdr_http_request_info_t *find_by_identity(qdr_core_t *core, qd_iterator_t *identity)
{
    if (!identity)
        return 0;

    qdr_http_request_info_t *record = DEQ_HEAD(_get_request_info()->records);
    while (record) {
        // Convert the passed in identity to a char*
        if (qd_iterator_equal(identity, (const unsigned char*) record->key))
            break;
        record = DEQ_NEXT(record);
    }

    return record;

}

void qdra_http_request_info_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
{
    qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "query for first http request info (%i)", offset);
    query->status = QD_AMQP_OK;

    if (offset >= DEQ_SIZE(_get_request_info()->records)) {
        query->more = false;
        qdr_agent_enqueue_response_CT(core, query);
        return;
    }

    qdr_http_request_info_t *record = DEQ_HEAD(_get_request_info()->records);
    for (int i = 0; i < offset && record; i++)
        record = DEQ_NEXT(record);
    assert(record);

    if (record) {
        write_list(core, query, record);
        query->next_offset = offset;
        advance(query, record);
    } else {
        query->more = false;
    }

    qdr_agent_enqueue_response_CT(core, query);
}

void qdra_http_request_info_get_next_CT(qdr_core_t *core, qdr_query_t *query)
{
    qdr_http_request_info_t *record = 0;

    if (query->next_offset < DEQ_SIZE(_get_request_info()->records)) {
        record = DEQ_HEAD(_get_request_info()->records);
        for (int i = 0; i < query->next_offset && record; i++)
            record = DEQ_NEXT(record);
    }

    if (record) {
        write_list(core, query, record);
        advance(query, record);
    } else {
        query->more = false;
    }
    qdr_agent_enqueue_response_CT(core, query);
}

void qdra_http_request_info_get_CT(qdr_core_t          *core,
                               qd_iterator_t       *name,
                               qd_iterator_t       *identity,
                               qdr_query_t         *query,
                               const char          *qdr_http_request_info_columns[])
{
    qdr_http_request_info_t *record = 0;

    if (!identity) {
        query->status = QD_AMQP_BAD_REQUEST;
        query->status.description = "Name not supported. Identity required";
        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s", HTTP_REQUEST_INFO_TYPE, query->status.description);
    } else {
        record = find_by_identity(core, identity);

        if (record == 0) {
            query->status = QD_AMQP_NOT_FOUND;
        } else {
            write_map(core, record, query->body, qdr_http_request_info_columns);
            query->status = QD_AMQP_OK;
        }
    }
    qdr_agent_enqueue_response_CT(core, query);
}

static const char* UNKNOWN = "unknown";

static qdr_http_method_status_t* _new_qdr_http_method_status_t(const char *const method, int status)
{
    qdr_http_method_status_t* record = NEW(qdr_http_method_status_t);
    ZERO(record);

    if (status >= 600 || status < 100) {
        status = 500;
    }
    if (method) {
        size_t key_len = strlen(method) + 5;
        record->key = malloc(key_len);
        snprintf(record->key, key_len, "%s:%03i", method, status);
    } else {
        record->key = qd_strdup(UNKNOWN);
    }

    return record;
}

static void _free_qdr_http_method_status(qdr_http_method_status_t* record)
{
    free(record->key);
    free(record);
}

static void _free_qdr_http_request_info(qdr_http_request_info_t* record)
{
    if (record->key) {
        free(record->key);
    }
    if (record->address) {
        free(record->address);
    }
    if (record->host) {
        free(record->host);
    }
    if (record->site) {
        free(record->site);
    }
    for (qdr_http_method_status_t *item = DEQ_HEAD(record->detail); item; item = DEQ_HEAD(record->detail)) {
        _free_qdr_http_method_status(item);
    }
    free(record);
}

static void _update_http_method_status_detail(qdr_http_method_status_list_t *detail, qdr_http_method_status_t *addition)
{
    bool updated = false;
    for (qdr_http_method_status_t *item = DEQ_HEAD(*detail); item && !updated; item = DEQ_NEXT(item)) {
        if (strcmp(item->key, addition->key) == 0) {
            item->requests += addition->requests;
            _free_qdr_http_method_status(addition);
            updated = true;
        }
    }
    if (!updated) {
        DEQ_INSERT_TAIL(*detail, addition);
    }
}

static bool _update_qdr_http_request_info(qdr_http_request_info_t* record, qdr_http_request_info_t* additions)
{
    if (strcmp(record->key, additions->key) == 0) {
        record->requests += additions->requests;
        record->bytes_in += additions->bytes_in;
        record->bytes_out += additions->bytes_out;
        if (additions->max_latency > record->max_latency) {
            record->max_latency = additions->max_latency;
        }
        for (qdr_http_method_status_t *item = DEQ_HEAD(additions->detail); item; item = DEQ_HEAD(additions->detail)) {
            DEQ_REMOVE_HEAD(additions->detail);
            _update_http_method_status_detail(&record->detail, item);
        }
        return true;
    } else {
        return false;
    }
}

static void _add_http_request_info_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
    qdr_http_request_info_t *update = (qdr_http_request_info_t*) action->args.general.context_1;
    bool updated = false;
    for (qdr_http_request_info_t *record = DEQ_HEAD(_get_request_info()->records); record && !updated; record = DEQ_NEXT(record)) {
        if (_update_qdr_http_request_info(record, update)) {
            updated = true;
            _free_qdr_http_request_info(update);
            qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Updated http request info %s", record->key);
        }
    }
    if (!updated) {
        DEQ_INSERT_TAIL(_get_request_info()->records, update);
        qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Added http request info %s (%zu)", update->key, DEQ_SIZE(_get_request_info()->records));
    }
}

static void _add_http_request_info(qdr_core_t *core, qdr_http_request_info_t* record)
{
    qdr_action_t *action = qdr_action(_add_http_request_info_CT, "add_http_request_info");
    action->args.general.context_1 = record;
    qdr_action_enqueue(core, action);
}

static qdr_http_request_info_t* _new_qdr_http_request_info_t()
{
    qdr_http_request_info_t* record = NEW(qdr_http_request_info_t);
    ZERO(record);
    DEQ_INIT(record->detail);
    return record;
}

static char *_record_key(const char *host, const char *address, const char* site, bool ingress)
{
    if (!host) return 0;
    size_t hostlen = strlen(host);
    size_t addresslen = address ? strlen(address) + 1 : 0;
    size_t sitelen = site ? strlen(site) + 1 : 0;
    char *key = malloc(hostlen + addresslen + sitelen + 3);
    size_t i = 0;
    key[i++] = ingress ? 'i' : 'o';
    key[i++] = '_';
    strcpy(key+i, host);
    i += hostlen;
    if (address) {
        key[i++] = '_';
        strcpy(key+i, address);
        i += (addresslen-1);
    }
    if (site) {
        key[i++] = '@';
        strcpy(key+i, site);
    }
    return key;
}

void qd_http_record_request(qdr_core_t *core, const char * method, uint32_t status_code, const char *address, const char *host,
                             const char *local_site, const char *remote_site, bool ingress,
                             uint64_t bytes_in, uint64_t bytes_out, uint64_t latency)
{
    qdr_http_request_info_t* record = _new_qdr_http_request_info_t();
    record->ingress = ingress;
    record->address = address ? qd_strdup(address) : 0;
    record->host = host ? qd_strdup(host) : 0;
    record->site = remote_site ? qd_strdup(remote_site) : 0;
    record->key = _record_key(record->host, record->address, remote_site, record->ingress);
    record->requests = 1;
    record->bytes_in = bytes_in;
    record->bytes_out = bytes_out;
    record->max_latency = latency;

    qdr_http_method_status_t *detail = _new_qdr_http_method_status_t(method, (int) status_code);
    detail->requests = 1;
    DEQ_INSERT_TAIL(record->detail, detail);

    qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_DEBUG, "Adding http request info %s", record->key);
    _add_http_request_info(core, record);
}

char *qd_get_host_from_host_port(const char *host_port)
{
    char *end = strchr(host_port, ':');
    if (end == NULL) {
        return 0;
    } else {
        size_t len = end - host_port;
        char *host = malloc(len + 1);
        strncpy(host, host_port, len);
        host[len] = '\0';
        return host;
    }
}

