blob: 5dd67777b8d5e916f2bade088547ac040acc7af6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <inttypes.h>
#include <math.h>
#include "proton/reactor.h"
#include "proton/message.h"
#include "proton/connection.h"
#include "proton/session.h"
#include "proton/link.h"
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/handlers.h"
bool stop = false;
int credit_window = 1000;
char *source_address = "test-address"; // name of the source node to receive from
char _addr[] = "127.0.0.1:5672";
char *host_address = _addr;
char *container_name = "TestReceiver";
bool drop_connection = false;
pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
pn_reactor_t *reactor;
pn_message_t *in_message; // holds the current received message
uint64_t count = 0;
uint64_t limit = 0; // if > 0 stop after limit messages arrive
static void signal_handler(int signum)
{
signal(SIGINT, SIG_IGN);
signal(SIGQUIT, SIG_IGN);
switch (signum) {
case SIGINT:
case SIGQUIT:
stop = true;
break;
default:
break;
}
}
// Called when reactor exits to clean up app_data
//
static void delete_handler(pn_handler_t *handler)
{
if (in_message) {
pn_message_free(in_message);
in_message = NULL;
}
}
/* Process each event posted by the reactor.
*/
static void event_handler(pn_handler_t *handler,
pn_event_t *event,
pn_event_type_t type)
{
switch (type) {
case PN_CONNECTION_INIT: {
// Create and open all the endpoints needed to send a message
//
in_message = pn_message();
pn_connection_open(pn_conn);
pn_ssn = pn_session(pn_conn);
pn_session_open(pn_ssn);
pn_link = pn_receiver(pn_ssn, "MyReceiver");
pn_terminus_set_address(pn_link_source(pn_link), source_address);
pn_link_open(pn_link);
// cannot receive without granting credit:
pn_link_flow(pn_link, credit_window);
} break;
case PN_DELIVERY: {
if (stop) break; // silently discard any further messages
bool rx_done = false;
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_delivery_readable(dlv)) {
// Drain the data as it comes in rather than waiting for the
// entire delivery to arrive. This allows the receiver to handle
// messages that are way huge.
ssize_t rc;
static char discard_buffer[1024 * 1024];
do {
rc = pn_link_recv(pn_delivery_link(dlv), discard_buffer, sizeof(discard_buffer));
} while (rc > 0);
rx_done = (rc == PN_EOS || rc < 0);
}
if (rx_done || !pn_delivery_partial(dlv)) {
// A full message has arrived (or a failure occurred)
count += 1;
pn_delivery_update(dlv, PN_ACCEPTED);
pn_delivery_settle(dlv); // dlv is now freed
if (pn_link_credit(pn_link) <= credit_window/2) {
// Grant enough credit to bring it up to CAPACITY:
pn_link_flow(pn_link, credit_window - pn_link_credit(pn_link));
}
if (limit && count == limit) {
stop = true;
pn_reactor_wakeup(reactor);
}
}
} break;
default:
break;
}
}
static void usage(void)
{
printf("Usage: receiver <options>\n");
printf("-a \tThe address:port of the server [%s]\n", host_address);
printf("-c \tExit after N messages arrive (0 == run forever) [%"PRIu64"]\n", limit);
printf("-i \tContainer name [%s]\n", container_name);
printf("-s \tSource address [%s]\n", source_address);
printf("-w \tCredit window [%d]\n", credit_window);
printf("-E \tExit without cleanly closing the connection [off]\n");
exit(1);
}
int main(int argc, char** argv)
{
/* create a handler for the connection's events.
*/
pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
pn_handler_add(handler, pn_handshaker());
/* command line options */
opterr = 0;
int c;
while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) {
switch(c) {
case 'h': usage(); break;
case 'a': host_address = optarg; break;
case 'c':
if (sscanf(optarg, "%"PRIu64, &limit) != 1)
usage();
break;
case 'i': container_name = optarg; break;
case 's': source_address = optarg; break;
case 'w':
if (sscanf(optarg, "%d", &credit_window) != 1 || credit_window <= 0)
usage();
break;
case 'E': drop_connection = true; break;
default:
usage();
break;
}
}
signal(SIGQUIT, signal_handler);
signal(SIGINT, signal_handler);
char *host = host_address;
if (strncmp(host, "amqp://", 7) == 0)
host += 7;
char *port = strrchr(host, ':');
if (port) {
*port++ = 0;
} else {
port = "5672";
}
reactor = pn_reactor();
pn_conn = pn_reactor_connection_to_host(reactor,
host,
port,
handler);
// the container name should be unique for each client
pn_connection_set_container(pn_conn, container_name);
pn_connection_set_hostname(pn_conn, host);
// periodic wakeup
pn_reactor_set_timeout(reactor, 1000);
pn_reactor_start(reactor);
while (pn_reactor_process(reactor)) {
if (stop) {
if (drop_connection) // hard exit
exit(0);
// close the endpoints this will cause pn_reactor_process() to
// eventually break the loop
if (pn_link) pn_link_close(pn_link);
if (pn_ssn) pn_session_close(pn_ssn);
if (pn_conn) pn_connection_close(pn_conn);
}
}
if (pn_link) pn_link_free(pn_link);
if (pn_ssn) pn_session_free(pn_ssn);
if (pn_conn) pn_connection_close(pn_conn);
pn_reactor_free(reactor);
return 0;
}