/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <inttypes.h>
#include <math.h>

#include "proton/reactor.h"
#include "proton/message.h"
#include "proton/connection.h"
#include "proton/session.h"
#include "proton/link.h"
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/handlers.h"


bool stop = false;

int  credit_window = 1000;
char *source_address = "test-address";  // name of the source node to receive from
char _addr[] = "127.0.0.1:5672";
char *host_address = _addr;
char *container_name = "TestReceiver";
bool drop_connection = false;

pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
pn_reactor_t *reactor;
pn_message_t *in_message;       // holds the current received message

uint64_t count = 0;
uint64_t limit = 0;   // if > 0 stop after limit messages arrive


static void signal_handler(int signum)
{
    signal(SIGINT,  SIG_IGN);
    signal(SIGQUIT, SIG_IGN);

    switch (signum) {
    case SIGINT:
    case SIGQUIT:
        stop = true;
        break;
    default:
        break;
    }
}


// Called when reactor exits to clean up app_data
//
static void delete_handler(pn_handler_t *handler)
{
    if (in_message) {
        pn_message_free(in_message);
        in_message = NULL;
    }
}


/* Process each event posted by the reactor.
 */
static void event_handler(pn_handler_t *handler,
                          pn_event_t *event,
                          pn_event_type_t type)
{
    switch (type) {

    case PN_CONNECTION_INIT: {
        // Create and open all the endpoints needed to send a message
        //
        in_message = pn_message();
        pn_connection_open(pn_conn);
        pn_ssn = pn_session(pn_conn);
        pn_session_open(pn_ssn);
        pn_link = pn_receiver(pn_ssn, "MyReceiver");
        pn_terminus_set_address(pn_link_source(pn_link), source_address);
        pn_link_open(pn_link);
        // cannot receive without granting credit:
        pn_link_flow(pn_link, credit_window);
    } break;

    case PN_DELIVERY: {

        if (stop) break;  // silently discard any further messages

        bool rx_done = false;
        pn_delivery_t *dlv = pn_event_delivery(event);
        if (pn_delivery_readable(dlv)) {

             // Drain the data as it comes in rather than waiting for the
             // entire delivery to arrive. This allows the receiver to handle
             // messages that are way huge.

             ssize_t rc;
             static char discard_buffer[1024 * 1024];
             do {
                 rc = pn_link_recv(pn_delivery_link(dlv), discard_buffer, sizeof(discard_buffer));
             } while (rc > 0);
             rx_done = (rc == PN_EOS || rc < 0);
        }

        if (rx_done || !pn_delivery_partial(dlv)) {

            // A full message has arrived (or a failure occurred)
            count += 1;
            pn_delivery_update(dlv, PN_ACCEPTED);
            pn_delivery_settle(dlv);  // dlv is now freed

            if (pn_link_credit(pn_link) <= credit_window/2) {
                // Grant enough credit to bring it up to CAPACITY:
                pn_link_flow(pn_link, credit_window - pn_link_credit(pn_link));
            }

            if (limit && count == limit) {
                stop = true;
                pn_reactor_wakeup(reactor);
            }
        }
    } break;

    default:
        break;
    }
}

static void usage(void)
{
    printf("Usage: receiver <options>\n");
    printf("-a      \tThe address:port of the server [%s]\n", host_address);
    printf("-c      \tExit after N messages arrive (0 == run forever) [%"PRIu64"]\n", limit);
    printf("-i      \tContainer name [%s]\n", container_name);
    printf("-s      \tSource address [%s]\n", source_address);
    printf("-w      \tCredit window [%d]\n", credit_window);
    printf("-E      \tExit without cleanly closing the connection [off]\n");
    exit(1);
}


int main(int argc, char** argv)
{
    /* create a handler for the connection's events.
     */
    pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
    pn_handler_add(handler, pn_handshaker());

    /* command line options */
    opterr = 0;
    int c;
    while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) {
        switch(c) {
        case 'h': usage(); break;
        case 'a': host_address = optarg; break;
        case 'c':
            if (sscanf(optarg, "%"PRIu64, &limit) != 1)
                usage();
            break;
        case 'i': container_name = optarg; break;
        case 's': source_address = optarg; break;
        case 'w':
            if (sscanf(optarg, "%d", &credit_window) != 1 || credit_window <= 0)
                usage();
            break;
        case 'E': drop_connection = true;  break;

        default:
            usage();
            break;
        }
    }

    signal(SIGQUIT, signal_handler);
    signal(SIGINT,  signal_handler);

    char *host = host_address;
    if (strncmp(host, "amqp://", 7) == 0)
        host += 7;
    char *port = strrchr(host, ':');
    if (port) {
        *port++ = 0;
    } else {
        port = "5672";
    }

    reactor = pn_reactor();
    pn_conn = pn_reactor_connection_to_host(reactor,
                                            host,
                                            port,
                                            handler);

    // the container name should be unique for each client
    pn_connection_set_container(pn_conn, container_name);
    pn_connection_set_hostname(pn_conn, host);

    // periodic wakeup
    pn_reactor_set_timeout(reactor, 1000);

    pn_reactor_start(reactor);

    while (pn_reactor_process(reactor)) {
        if (stop) {
            if (drop_connection)  // hard exit
                exit(0);
            // close the endpoints this will cause pn_reactor_process() to
            // eventually break the loop
            if (pn_link) pn_link_close(pn_link);
            if (pn_ssn) pn_session_close(pn_ssn);
            if (pn_conn) pn_connection_close(pn_conn);
        }
    }

    if (pn_link) pn_link_free(pn_link);
    if (pn_ssn) pn_session_free(pn_ssn);
    if (pn_conn) pn_connection_close(pn_conn);

    pn_reactor_free(reactor);

    return 0;
}
