/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#define ADD_ANNOTATIONS 1

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <inttypes.h>
#include <math.h>


#include "proton/reactor.h"
#include "proton/message.h"
#include "proton/connection.h"
#include "proton/session.h"
#include "proton/link.h"
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/handlers.h"

#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/message.h>

#define BOOL2STR(b) ((b)?"true":"false")

#define BODY_SIZE_SMALL  100L
#define BODY_SIZE_MEDIUM ((long int)((4 * 1024) + 1))
#define BODY_SIZE_LARGE  ((long int)((65 * 1024) + 1))
#define BODY_SIZE_HUGE   ((long int)((BUFFER_SIZE * QD_QLIMIT_Q3_UPPER * 3) + 1))

#define DEFAULT_PRIORITY 4

// body data - block of 0's
//

bool stop = false;

uint64_t limit = 1;               // # messages to send
uint64_t count = 0;               // # sent
uint64_t acked = 0;               // # of received acks

// outcome counts
uint64_t accepted = 0;
uint64_t rejected = 0;
uint64_t modified = 0;
uint64_t released = 0;

char body_data_pattern = 'B';  // fill body data
bool use_anonymous = false;    // use anonymous link if true
bool presettle = false;        // true = send presettled
bool add_annotations = false;
long int body_size = BODY_SIZE_SMALL;
bool drop_connection = false;
unsigned int priority = DEFAULT_PRIORITY;

// buffer for encoded message
pn_bytes_t body_data = {0};
char *encode_buffer = NULL;
size_t encode_buffer_size = 0;    // size of malloced memory
size_t encoded_data_size = 0;     // length of encoded content


char *target_address = "test-address";
char _addr[] = "127.0.0.1:5672";
char *host_address = _addr;
char *container_name = "TestSender";

pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
pn_reactor_t *reactor;
pn_message_t *out_message;


// odd-length long string
const char big_string[] =
    "+"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";


static void add_message_annotations(pn_message_t *out_message)
{
    // just a bunch of dummy MA
    pn_data_t *annos = pn_message_annotations(out_message);
    pn_data_clear(annos);
    pn_data_put_map(annos);
    pn_data_enter(annos);

    pn_data_put_symbol(annos, pn_bytes(strlen("my-key"), "my-key"));
    pn_data_put_string(annos, pn_bytes(strlen("my-data"), "my-data"));

    pn_data_put_symbol(annos, pn_bytes(strlen("my-other-key"), "my-other-key"));
    pn_data_put_string(annos, pn_bytes(strlen("my-other-data"), "my-other-data"));

    // embedded map
    pn_data_put_symbol(annos, pn_bytes(strlen("my-map"), "my-map"));
    pn_data_put_map(annos);
    pn_data_enter(annos);
    pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key1"), "my-map-key1"));
    pn_data_put_char(annos, 'X');
    pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key2"), "my-map-key2"));
    pn_data_put_byte(annos, 0x12);
    pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key3"), "my-map-key3"));
    pn_data_put_string(annos, pn_bytes(strlen("Are We Not Men?"), "Are We Not Men?"));
    pn_data_put_symbol(annos, pn_bytes(strlen("my-last-key"), "my-last-key"));
    pn_data_put_binary(annos, pn_bytes(sizeof(big_string), big_string));
    pn_data_exit(annos);

    pn_data_put_symbol(annos, pn_bytes(strlen("my-ulong"), "my-ulong"));
    pn_data_put_ulong(annos, 0xDEADBEEFCAFEBEEF);

    // embedded list
    pn_data_put_symbol(annos, pn_bytes(strlen("my-list"), "my-list"));
    pn_data_put_list(annos);
    pn_data_enter(annos);
    pn_data_put_string(annos, pn_bytes(sizeof(big_string), big_string));
    pn_data_put_double(annos, 3.1415);
    pn_data_put_short(annos, 1966);
    pn_data_exit(annos);

    pn_data_put_symbol(annos, pn_bytes(strlen("my-bool"), "my-bool"));
    pn_data_put_bool(annos, false);

    pn_data_exit(annos);
}


void generate_message(void)
{
    if (!out_message) {
        out_message = pn_message();
    }

    if (use_anonymous) {
        pn_message_set_address(out_message, target_address);
    }

    if (priority != DEFAULT_PRIORITY) {
        pn_message_set_priority(out_message, (uint8_t)priority);
    }

    pn_data_t *body = pn_message_body(out_message);
    pn_data_clear(body);

    if (!body_data.start) {
        char *ptr = malloc(BODY_SIZE_HUGE);
        if (!ptr) {
            perror("Out of memory!");
            exit(-1);
        }
        memset(ptr, (int)body_data_pattern, BODY_SIZE_HUGE);
        body_data.start = ptr;
    }

    body_data.size = body_size;
    pn_data_put_binary(body, body_data);

    if (add_annotations) {
        add_message_annotations(out_message);
    }

    // now encode it

    pn_data_rewind(pn_message_body(out_message));
    if (!encode_buffer) {
        encode_buffer_size = body_size + 512;
        encode_buffer = malloc(encode_buffer_size);
    }

    int rc = 0;
    size_t len = encode_buffer_size;
    do {
        rc = pn_message_encode(out_message, encode_buffer, &len);
        if (rc == PN_OVERFLOW) {
            free(encode_buffer);
            encode_buffer_size *= 2;
            encode_buffer = malloc(encode_buffer_size);
            len = encode_buffer_size;
        }
    } while (rc == PN_OVERFLOW);

    if (rc) {
        perror("buffer encode failed");
        exit(-1);
    }

    encoded_data_size = len;
}


static void signal_handler(int signum)
{
    signal(SIGINT,  SIG_IGN);
    signal(SIGQUIT, SIG_IGN);

    switch (signum) {
    case SIGINT:
    case SIGQUIT:
        stop = true;
        break;
    default:
        break;
    }
}


static void delete_handler(pn_handler_t *handler)
{
    free(encode_buffer);
    pn_message_free(out_message);
    free((void *)body_data.start);
}


/* Process each event posted by the reactor.
 */
static void event_handler(pn_handler_t *handler,
                          pn_event_t *event,
                          pn_event_type_t type)
{
    switch (type) {

    case PN_CONNECTION_INIT: {
        // Create and open all the endpoints needed to send a message
        //
        pn_connection_open(pn_conn);
        pn_session_t *pn_ssn = pn_session(pn_conn);
        pn_session_open(pn_ssn);
        pn_link_t *pn_link = pn_sender(pn_ssn, "MySender");
        if (!use_anonymous) {
            pn_terminus_set_address(pn_link_target(pn_link), target_address);
        }
        pn_link_open(pn_link);

        acked = count;
        generate_message();

    } break;

    case PN_LINK_FLOW: {
        // the remote has given us some credit, now we can send messages
        //
        static long tag = 0;  // a simple tag generator
        pn_link_t *sender = pn_event_link(event);
        int credit = pn_link_credit(sender);

        while (!stop && credit > 0 && (limit == 0 || count < limit)) {
            --credit;
            ++count;
            ++tag;
            pn_delivery_t *delivery = pn_delivery(sender, pn_dtag((const char *)&tag, sizeof(tag)));
            pn_link_send(sender, encode_buffer, encoded_data_size);
            pn_link_advance(sender);
            if (presettle) {
                pn_delivery_settle(delivery);
                // fake terminal outcome
                ++acked;
                ++accepted;
                if (limit && count == limit) {
                    // no need to wait for acks
                    stop = true;
                    pn_reactor_wakeup(reactor);
                }
            }
        }
    } break;

    case PN_DELIVERY: {
        pn_delivery_t *dlv = pn_event_delivery(event);
        if (pn_delivery_updated(dlv)) {
            uint64_t rs = pn_delivery_remote_state(dlv);
            switch (rs) {
            case PN_RECEIVED:
                // This is not a terminal state - it is informational, and the
                // peer is still processing the message.
                break;
            case PN_ACCEPTED:
                ++acked;
                ++accepted;
                pn_delivery_settle(dlv);
                break;
            case PN_REJECTED:
                ++acked;
                ++rejected;
                pn_delivery_settle(dlv);
                break;
            case PN_RELEASED:
                ++acked;
                ++released;
                pn_delivery_settle(dlv);
                break;
            case PN_MODIFIED:
                ++acked;
                ++modified;
                pn_delivery_settle(dlv);
                break;

            default:
                break;
            }

            if (limit && acked == limit) {
                // initiate clean shutdown of the endpoints
                stop = true;
                pn_reactor_wakeup(reactor);
            }
        }
    } break;

    default:
        break;
    }
}

static void usage(void)
{
  printf("Usage: sender <options>\n");
  printf("-a      \tThe address:port of the server [%s]\n", host_address);
  printf("-c      \t# of messages to send, 0 == nonstop [%"PRIu64"]\n", limit);
  printf("-i      \tContainer name [%s]\n", container_name);
  printf("-n      \tUse an anonymous link [%s]\n", BOOL2STR(use_anonymous));
  printf("-s      \tBody size in bytes ('s'=%ld 'm'=%ld 'l'=%ld 'x'=%ld) [%ld]\n",
         BODY_SIZE_SMALL, BODY_SIZE_MEDIUM, BODY_SIZE_LARGE, BODY_SIZE_HUGE, body_size);
  printf("-t      \tTarget address [%s]\n", target_address);
  printf("-u      \tSend all messages presettled [%s]\n", BOOL2STR(presettle));
  printf("-M      \tAdd dummy Message Annotations section [off]\n");
  printf("-E      \tExit without cleanly closing the connection [off]\n");
  printf("-p      \tMessage priority [%d]\n", priority);
  printf("-X      \tMessage body data pattern [%c]\n", (char)body_data_pattern);
  exit(1);
}

int main(int argc, char** argv)
{
    /* command line options */
    opterr = 0;
    int c;
    while ((c = getopt(argc, argv, "ha:c:i:ns:t:uMEp:X:")) != -1) {
        switch(c) {
        case 'h': usage(); break;
        case 'a': host_address = optarg; break;
        case 'c':
            if (sscanf(optarg, "%"PRIu64, &limit) != 1)
                usage();
            break;
        case 'i': container_name = optarg; break;
        case 'n': use_anonymous = true; break;
        case 's':
            switch (optarg[0]) {
            case 's': body_size = BODY_SIZE_SMALL; break;
            case 'm': body_size = BODY_SIZE_MEDIUM; break;
            case 'l': body_size = BODY_SIZE_LARGE; break;
            case 'x': body_size = BODY_SIZE_HUGE; break;
            default:
                usage();
            }
            break;
        case 't': target_address = optarg; break;
        case 'u': presettle = true;        break;
        case 'M': add_annotations = true;  break;
        case 'E': drop_connection = true;  break;
        case 'X': body_data_pattern = optarg[0];  break;
        case 'p':
            if (sscanf(optarg, "%u", &priority) != 1)
                usage();
            break;

        default:
            usage();
            break;
        }
    }

    signal(SIGQUIT, signal_handler);
    signal(SIGINT,  signal_handler);

    char *host = host_address;
    if (strncmp(host, "amqp://", 7) == 0)
        host += 7;
    char *port = strrchr(host, ':');
    if (port) {
        *port++ = 0;
    } else {
        port = "5672";
    }

    pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
    pn_handler_add(handler, pn_handshaker());

    reactor = pn_reactor();
    pn_conn = pn_reactor_connection_to_host(reactor,
                                            host,
                                            port,
                                            handler);

    // the container name should be unique for each client
    pn_connection_set_container(pn_conn, container_name);
    pn_connection_set_hostname(pn_conn, host);

    // break out of pn_reactor_process once a second to check if done
    pn_reactor_set_timeout(reactor, 1000);

    pn_reactor_start(reactor);

    time_t last_log = 0;
    while (pn_reactor_process(reactor)) {
        if (stop) {
            if (drop_connection) {  // hard stop
                fprintf(stdout,
                        "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
                        " Released:%"PRIu64" Modified:%"PRIu64"\n",
                        count, accepted, rejected, released, modified);
                exit(0);
            }

            // wait (forever) until all sent messages are confirmed by the
            // receiver

            if (count == acked) {
                // close the endpoints this will cause pn_reactor_process() to
                // eventually break the loop
                if (pn_link) pn_link_close(pn_link);
                if (pn_ssn) pn_session_close(pn_ssn);
                if (pn_conn) pn_connection_close(pn_conn);
            } else {
                // periodically give status for test output logs
                time_t now = time(NULL);
                if ((now - last_log) >= 1) {
                    fprintf(stdout,
                            "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
                            " Released:%"PRIu64" Modified:%"PRIu64"\n",
                            count, accepted, rejected, released, modified);
                    last_log = now;
                }
            }
        }
    }

    if (pn_link) pn_link_free(pn_link);
    if (pn_ssn) pn_session_free(pn_ssn);
    if (pn_conn) pn_connection_close(pn_conn);

    pn_reactor_free(reactor);

    return 0;
}
