blob: a87649224d5b8e2a487f91bf270c8fab9c6c0b31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*
* Implements a subset of msgr-recv.c using reactor events.
*/
#define PN_USE_DEPRECATED_API 1
#include "proton/message.h"
#include "proton/error.h"
#include "proton/types.h"
#include "proton/reactor.h"
#include "proton/handlers.h"
#include "proton/engine.h"
#include "proton/url.h"
#include "msgr-common.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
// The exact struct from msgr-recv, mostly fallow.
typedef struct {
Addresses_t subscriptions;
uint64_t msg_count;
int recv_count;
int incoming_window;
int timeout; // seconds
unsigned int report_interval; // in seconds
int outgoing_window;
int reply;
const char *name;
const char *ready_text;
char *certificate;
char *privatekey; // used to sign certificate
char *password; // for private key file
char *ca_db; // trusted CA database
} Options_t;
static void usage(int rc)
{
printf("Usage: reactor-recv [OPTIONS] \n"
" -c # \tNumber of messages to receive before exiting [0=forever]\n"
" -R \tSend reply if 'reply-to' present\n"
" -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n"
" -X <text> \tPrint '<text>\\n' to stdout after all subscriptions are created\n"
);
exit(rc);
}
// Global context for this process
typedef struct {
Options_t *opts;
Statistics_t *stats;
uint64_t sent;
uint64_t received;
pn_message_t *message;
pn_acceptor_t *acceptor;
char *encoded_data;
size_t encoded_data_size;
int connections;
pn_list_t *active_connections;
bool shutting_down;
pn_handler_t *listener_handler;
int quiesce_count;
} global_context_t;
// Per connection context
typedef struct {
global_context_t *global;
int connection_id;
pn_link_t *recv_link;
pn_link_t *reply_link;
} connection_context_t;
/**
* @return buffer of sufficient size, or NULL
*/
static char *ensure_buffer(char *buf, size_t needed, size_t *actual)
{
// Make room for the largest message seen so far, plus extra for slight changes in metadata content
if (needed + 1024 <= *actual)
return buf;
needed += 2048;
buf = (char *) realloc(buf, needed);
*actual = buf ? needed : 0;
return buf;
}
void global_shutdown(global_context_t *gc)
{
if (gc->shutting_down) return;
gc->shutting_down = true;
pn_acceptor_close(gc->acceptor);
size_t n = pn_list_size(gc->active_connections);
for (size_t i = 0; i < n; i++) {
pn_connection_t *conn = (pn_connection_t *) pn_list_get(gc->active_connections, i);
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
pn_connection_close(conn);
}
}
}
connection_context_t *connection_context(pn_handler_t *h)
{
connection_context_t *p = (connection_context_t *) pn_handler_mem(h);
return p;
}
void connection_context_init(connection_context_t *cc, global_context_t *gc)
{
cc->global = gc;
pn_incref(gc->listener_handler);
cc->connection_id = gc->connections++;
cc->recv_link = 0;
cc->reply_link = 0;
}
void connection_cleanup(pn_handler_t *h)
{
connection_context_t *cc = connection_context(h);
// Undo pn_incref() from connection_context_init()
pn_decref(cc->global->listener_handler);
}
void connection_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type)
{
connection_context_t *cc = connection_context(h);
bool replying = cc->global->opts->reply;
switch (type) {
case PN_LINK_REMOTE_OPEN:
{
pn_link_t *link = pn_event_link(event);
if (pn_link_is_receiver(link)) {
check(cc->recv_link == NULL, "Multiple incoming links on one connection");
cc->recv_link = link;
pn_connection_t *conn = pn_event_connection(event);
pn_list_add(cc->global->active_connections, conn);
if (cc->global->shutting_down) {
pn_connection_close(conn);
break;
}
if (replying) {
// Set up a reply link and defer granting credit to the incoming link
pn_connection_t *conn = pn_session_connection(pn_link_session(link));
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
char name[100]; // prefer a multiplatform uuid generator
sprintf(name, "reply_sender_%d", cc->connection_id);
cc->reply_link = pn_sender(ssn, name);
pn_link_open(cc->reply_link);
}
else {
pn_flowcontroller_t *fc = pn_flowcontroller(1024);
pn_handler_add(h, fc);
pn_decref(fc);
}
}
}
break;
case PN_LINK_FLOW:
{
if (replying) {
pn_link_t *reply_link = pn_event_link(event);
// pn_flowcontroller handles the non-reply case
check(reply_link == cc->reply_link, "internal error");
// Grant the sender as much credit as just given to us for replies
int delta = pn_link_credit(reply_link) - pn_link_credit(cc->recv_link);
if (delta > 0)
pn_link_flow(cc->recv_link, delta);
}
}
break;
case PN_DELIVERY:
{
pn_link_t *recv_link = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) {
if (cc->global->received == 0) statistics_start(cc->global->stats);
size_t encoded_size = pn_delivery_pending(dlv);
cc->global->encoded_data = ensure_buffer(cc->global->encoded_data, encoded_size,
&cc->global->encoded_data_size);
check(cc->global->encoded_data, "decoding buffer realloc failure");
ssize_t n = pn_link_recv(recv_link, cc->global->encoded_data, encoded_size);
check(n == (ssize_t) encoded_size, "message data read fail");
pn_message_t *msg = cc->global->message;
int err = pn_message_decode(msg, cc->global->encoded_data, n);
check(err == 0, "message decode error");
cc->global->received++;
pn_delivery_settle(dlv);
statistics_msg_received(cc->global->stats, msg);
if (replying) {
const char *reply_addr = pn_message_get_reply_to(msg);
if (reply_addr) {
pn_link_t *rl = cc->reply_link;
check(pn_link_credit(rl) > 0, "message received without corresponding reply credit");
LOG("Replying to: %s\n", reply_addr );
pn_message_set_address(msg, reply_addr);
pn_message_set_creation_time(msg, msgr_now());
char tag[8];
void *ptr = &tag;
*((uint64_t *) ptr) = cc->global->sent;
pn_delivery_t *dlv = pn_delivery(rl, pn_dtag(tag, 8));
size_t size = cc->global->encoded_data_size;
int err = pn_message_encode(msg, cc->global->encoded_data, &size);
check(err == 0, "message encoding error");
pn_link_send(rl, cc->global->encoded_data, size);
pn_delivery_settle(dlv);
cc->global->sent++;
}
}
}
if (cc->global->received >= cc->global->opts->msg_count) {
global_shutdown(cc->global);
}
}
break;
case PN_CONNECTION_UNBOUND:
{
pn_connection_t *conn = pn_event_connection(event);
pn_list_remove(cc->global->active_connections, conn);
pn_connection_release(conn);
}
break;
default:
break;
}
}
pn_handler_t *connection_handler(global_context_t *gc)
{
pn_handler_t *h = pn_handler_new(connection_dispatch, sizeof(connection_context_t), connection_cleanup);
connection_context_t *cc = connection_context(h);
connection_context_init(cc, gc);
return h;
}
void start_listener(global_context_t *gc, pn_reactor_t *reactor)
{
check(gc->opts->subscriptions.count > 0, "no listening address");
pn_url_t *listen_url = pn_url_parse(gc->opts->subscriptions.addresses[0]);
const char *host = pn_url_get_host(listen_url);
const char *port = pn_url_get_port(listen_url);
if (port == 0 || strlen(port) == 0)
port = "5672";
if (host == 0 || strlen(host) == 0)
host = "0.0.0.0";
if (*host == '~') host++;
gc->acceptor = pn_reactor_acceptor(reactor, host, port, NULL);
check(gc->acceptor, "acceptor creation failed");
pn_url_free(listen_url);
}
void global_context_init(global_context_t *gc, Options_t *o, Statistics_t *s)
{
gc->opts = o;
gc->stats = s;
gc->sent = 0;
gc->received = 0;
gc->encoded_data_size = 0;
gc->encoded_data = 0;
gc->message = pn_message();
check(gc->message, "failed to allocate a message");
gc->connections = 0;
gc->active_connections = pn_list(PN_OBJECT, 0);
gc->acceptor = 0;
gc->shutting_down = false;
gc->listener_handler = 0;
gc->quiesce_count = 0;
}
global_context_t *global_context(pn_handler_t *h)
{
return (global_context_t *) pn_handler_mem(h);
}
void listener_cleanup(pn_handler_t *h)
{
global_context_t *gc = global_context(h);
pn_message_free(gc->message);
free(gc->encoded_data);
pn_free(gc->active_connections);
}
void listener_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type)
{
global_context_t *gc = global_context(h);
if (type == PN_REACTOR_QUIESCED)
gc->quiesce_count++;
else
gc->quiesce_count = 0;
switch (type) {
case PN_CONNECTION_INIT:
{
pn_connection_t *connection = pn_event_connection(event);
// New incoming connection on listener socket. Give each a separate handler.
pn_handler_t *ch = connection_handler(gc);
pn_handshaker_t *handshaker = pn_handshaker();
pn_handler_add(ch, handshaker);
pn_decref(handshaker);
pn_record_t *record = pn_connection_attachments(connection);
pn_record_set_handler(record, ch);
pn_decref(ch);
}
break;
case PN_REACTOR_QUIESCED:
{
// Two quiesce in a row means we have been idle for a timeout period
if (gc->opts->timeout != -1 && gc->quiesce_count > 1)
global_shutdown(gc);
}
break;
case PN_REACTOR_INIT:
{
pn_reactor_t *reactor = pn_event_reactor(event);
start_listener(gc, reactor);
// hack to let test scripts know when the receivers are ready (so
// that the senders may be started)
if (gc->opts->ready_text) {
fprintf(stdout, "%s\n", gc->opts->ready_text);
fflush(stdout);
}
if (gc->opts->timeout != -1)
pn_reactor_set_timeout(pn_event_reactor(event), gc->opts->timeout);
}
break;
case PN_REACTOR_FINAL:
{
if (gc->received == 0) statistics_start(gc->stats);
statistics_report(gc->stats, gc->sent, gc->received);
}
break;
default:
break;
}
}
pn_handler_t *listener_handler(Options_t *opts, Statistics_t *stats)
{
pn_handler_t *h = pn_handler_new(listener_dispatch, sizeof(global_context_t), listener_cleanup);
global_context_t *gc = global_context(h);
global_context_init(gc, opts, stats);
gc->listener_handler = h;
return h;
}
static void parse_options( int argc, char **argv, Options_t *opts )
{
int c;
opterr = 0;
memset( opts, 0, sizeof(*opts) );
opts->recv_count = -1;
opts->timeout = -1;
addresses_init( &opts->subscriptions);
while ((c = getopt(argc, argv,
"a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) {
switch (c) {
case 'a':
{
// TODO: multiple addresses?
char *comma = strchr(optarg, ',');
check(comma == 0, "multiple addresses not implemented");
check(opts->subscriptions.count == 0, "multiple addresses not implemented");
addresses_merge( &opts->subscriptions, optarg );
}
break;
case 'c':
if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 't':
if (sscanf( optarg, "%d", &opts->timeout ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
if (opts->timeout > 0) opts->timeout *= 1000;
break;
case 'R': opts->reply = 1; break;
case 'V': enable_logging(); break;
case 'X': opts->ready_text = optarg; break;
default:
usage(1);
}
}
if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions,
"amqp://~0.0.0.0" );
}
int main(int argc, char** argv)
{
Options_t opts;
Statistics_t stats;
parse_options( argc, argv, &opts );
pn_reactor_t *reactor = pn_reactor();
// set up default handlers for our reactor
pn_handler_t *root = pn_reactor_get_handler(reactor);
pn_handler_t *lh = listener_handler(&opts, &stats);
pn_handler_add(root, lh);
pn_handshaker_t *handshaker = pn_handshaker();
pn_handler_add(root, handshaker);
// Omit decrefs else segfault. Not sure why they are necessary
// to keep valgrind happy for the connection_handler, but not here.
// pn_decref(handshaker);
// pn_decref(lh);
pn_reactor_run(reactor);
pn_reactor_free(reactor);
addresses_free( &opts.subscriptions );
return 0;
}
#undef PN_USE_DEPRECATED_API