blob: 1b7870f79296226bdabe0c2a2479497354d69642 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#define PN_USE_DEPRECATED_API 1
/*
* Implements a subset of msgr-send.c using reactor events.
*/
#include "proton/message.h"
#include "proton/error.h"
#include "proton/types.h"
#include "proton/reactor.h"
#include "proton/handlers.h"
#include "proton/engine.h"
#include "proton/url.h"
#include "msgr-common.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
typedef struct {
Addresses_t targets;
uint64_t msg_count;
uint32_t msg_size; // of body
uint32_t send_batch;
int outgoing_window;
unsigned int report_interval; // in seconds
//Addresses_t subscriptions;
//Addresses_t reply_tos;
int get_replies;
int unique_message; // 1 -> create and free a pn_message_t for each send/recv
int timeout; // in seconds
int incoming_window;
int recv_count;
const char *name;
char *certificate;
char *privatekey; // used to sign certificate
char *password; // for private key file
char *ca_db; // trusted CA database
} Options_t;
static void usage(int rc)
{
printf("Usage: reactor-send [OPTIONS] \n"
" -a <addr> \tThe target address [amqp[s]://domain[/name]]\n"
" -c # \tNumber of messages to send before exiting [0=forever]\n"
" -b # \tSize of message body in bytes [1024]\n"
" -R \tWait for a reply to each sent message\n"
" -V \tEnable debug logging\n"
);
exit(rc);
}
typedef struct {
Options_t *opts;
Statistics_t *stats;
uint64_t sent;
uint64_t received;
pn_message_t *message;
pn_message_t *reply_message;
pn_atom_t id;
char *encoded_data;
size_t encoded_data_size;
pn_url_t *send_url;
pn_string_t *hostname;
pn_string_t *container_id;
pn_string_t *reply_to;
} sender_context_t;
void sender_context_init(sender_context_t *sc, Options_t *opts, Statistics_t *stats)
{
sc->opts = opts;
sc->stats = stats;
sc->sent = 0;
sc->received = 0;
sc->id.type = PN_ULONG;
// 4096 extra bytes should easily cover the message metadata
sc->encoded_data_size = sc->opts->msg_size + 4096;
sc->encoded_data = (char *)calloc(1, sc->encoded_data_size);
check(sc->encoded_data, "failed to allocate encoding buffer");
sc->container_id = pn_string("reactor-send"); // prefer uuid-like name
sc->reply_message = (sc->opts->get_replies) ? pn_message() : 0;
sc->message = pn_message();
check(sc->message, "failed to allocate a message");
sc->reply_to = pn_string("amqp://");
pn_string_addf(sc->reply_to, "%s", pn_string_get(sc->container_id));
pn_message_set_reply_to(sc->message, pn_string_get(sc->reply_to));
pn_data_t *body = pn_message_body(sc->message);
// borrow the encoding buffer this one time
char *data = sc->encoded_data;
pn_data_put_binary(body, pn_bytes(sc->opts->msg_size, data));
check(sc->opts->targets.count > 0, "no specified address");
sc->send_url = pn_url_parse(sc->opts->targets.addresses[0]);
const char *host = pn_url_get_host(sc->send_url);
const char *port = pn_url_get_port(sc->send_url);
sc->hostname = pn_string(host);
if (port && strlen(port))
pn_string_addf(sc->hostname, ":%s", port);
}
sender_context_t *sender_context(pn_handler_t *h)
{
return (sender_context_t *) pn_handler_mem(h);
}
void sender_cleanup(pn_handler_t *h)
{
sender_context_t *sc = sender_context(h);
pn_message_free(sc->message);
pn_message_free(sc->reply_message);
pn_url_free(sc->send_url);
pn_free(sc->hostname);
pn_free(sc->container_id);
pn_free(sc->reply_to);
free(sc->encoded_data);
}
pn_handler_t *replyto_handler(sender_context_t *sc);
pn_message_t* get_message(sender_context_t *sc, bool sending) {
if (sc->opts->unique_message) {
pn_message_t *m = pn_message();
check(m, "failed to allocate a message");
if (sending) {
pn_message_set_reply_to(m, pn_string_get(sc->reply_to));
// copy the data
pn_data_t *body = pn_message_body(m);
pn_data_t *template_body = pn_message_body(sc->message);
pn_data_put_binary(body, pn_data_get_binary(template_body));
}
return m;
}
else
return sending ? sc->message : sc->reply_message; // our simplified "message pool"
}
void return_message(sender_context_t *sc, pn_message_t *m) {
if (sc->opts->unique_message)
pn_message_free(m);
}
void sender_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type)
{
sender_context_t *sc = sender_context(h);
switch (type) {
case PN_CONNECTION_INIT:
{
pn_connection_t *conn = pn_event_connection(event);
pn_connection_set_container(conn, pn_string_get(sc->container_id));
pn_connection_set_hostname(conn, pn_string_get(sc->hostname));
pn_connection_open(conn);
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
pn_link_t *snd = pn_sender(ssn, "sender");
const char *path = pn_url_get_path(sc->send_url);
if (path && strlen(path)) {
pn_terminus_set_address(pn_link_target(snd), path);
pn_terminus_set_address(pn_link_source(snd), path);
}
pn_link_open(snd);
}
break;
case PN_LINK_FLOW:
{
pn_link_t *snd = pn_event_link(event);
while (pn_link_credit(snd) > 0 && sc->sent < sc->opts->msg_count) {
if (sc->sent == 0)
statistics_start(sc->stats);
char tag[8];
void *ptr = &tag;
*((uint64_t *) ptr) = sc->sent;
pn_delivery_t *dlv = pn_delivery(snd, pn_dtag(tag, 8));
// setup the message to send
pn_message_t *msg = get_message(sc, true);;
pn_message_set_address(msg, sc->opts->targets.addresses[0]);
sc->id.u.as_ulong = sc->sent;
pn_message_set_correlation_id(msg, sc->id);
pn_message_set_creation_time(msg, msgr_now());
size_t size = sc->encoded_data_size;
int err = pn_message_encode(msg, sc->encoded_data, &size);
check(err == 0, "message encoding error");
pn_link_send(snd, sc->encoded_data, size);
pn_delivery_settle(dlv);
sc->sent++;
return_message(sc, msg);
}
if (sc->sent == sc->opts->msg_count && !sc->opts->get_replies) {
pn_link_close(snd);
pn_connection_t *conn = pn_event_connection(event);
pn_connection_close(conn);
}
}
break;
case PN_LINK_INIT:
{
pn_link_t *link = pn_event_link(event);
if (pn_link_is_receiver(link)) {
// Response messages link. Could manage credit and deliveries in this handler but
// a dedicated handler also works.
pn_handler_t *replyto = replyto_handler(sc);
pn_flowcontroller_t *fc = pn_flowcontroller(1024);
pn_handler_add(replyto, fc);
pn_decref(fc);
pn_handshaker_t *handshaker = pn_handshaker();
pn_handler_add(replyto, handshaker);
pn_decref(handshaker);
pn_record_t *record = pn_link_attachments(link);
pn_record_set_handler(record, replyto);
pn_decref(replyto);
}
}
break;
case PN_CONNECTION_LOCAL_CLOSE:
{
statistics_report(sc->stats, sc->sent, sc->received);
}
break;
default:
break;
}
}
pn_handler_t *sender_handler(Options_t *opts, Statistics_t *stats)
{
pn_handler_t *h = pn_handler_new(sender_dispatch, sizeof(sender_context_t), sender_cleanup);
sender_context_t *sc = sender_context(h);
sender_context_init(sc, opts, stats);
return h;
}
sender_context_t *replyto_sender_context(pn_handler_t *h)
{
sender_context_t **p = (sender_context_t **) pn_handler_mem(h);
return *p;
}
void replyto_cleanup(pn_handler_t *h)
{}
void replyto_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) {
sender_context_t *sc = replyto_sender_context(h);
switch (type) {
case PN_DELIVERY:
{
check(sc->opts->get_replies, "Unexpected reply message");
pn_link_t *recv_link = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) {
size_t encoded_size = pn_delivery_pending(dlv);
check(encoded_size <= sc->encoded_data_size, "decoding buffer too small");
ssize_t n = pn_link_recv(recv_link, sc->encoded_data, encoded_size);
check(n == (ssize_t)encoded_size, "read fail on reply link");
pn_message_t *msg = get_message(sc, false);
int err = pn_message_decode(msg, sc->encoded_data, n);
check(err == 0, "message decode error");
statistics_msg_received(sc->stats, msg);
sc->received++;
pn_delivery_settle(dlv);
return_message(sc, msg);
}
if (sc->received == sc->opts->msg_count) {
pn_link_close(recv_link);
pn_connection_t *conn = pn_event_connection(event);
pn_connection_close(conn);
}
}
break;
default:
break;
}
}
pn_handler_t *replyto_handler(sender_context_t *sc)
{
pn_handler_t *h = pn_handler_new(replyto_dispatch, sizeof(sender_context_t *), replyto_cleanup);
sender_context_t **p = (sender_context_t **) pn_handler_mem(h);
*p = sc;
return h;
}
static void parse_options( int argc, char **argv, Options_t *opts )
{
int c;
opterr = 0;
memset( opts, 0, sizeof(*opts) );
opts->msg_size = 1024;
opts->send_batch = 1024;
opts->timeout = -1;
opts->recv_count = -1;
opts->unique_message = 0;
addresses_init(&opts->targets);
while ((c = getopt(argc, argv,
"ua:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) {
switch(c) {
case 'a':
{
// TODO: multiple addresses? To keep tests happy, accept multiple for now,
// but ignore all but the first.
addresses_merge( &opts->targets, optarg );
}
break;
case 'c':
if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'b':
if (sscanf( optarg, "%u", &opts->msg_size ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'p':
if (sscanf( optarg, "%u", &opts->send_batch ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'w':
if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'e':
if (sscanf( optarg, "%u", &opts->report_interval ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'R': opts->get_replies = 1; break;
case 'u': opts->unique_message = 1; break;
case 't':
if (sscanf( optarg, "%d", &opts->timeout ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
if (opts->timeout > 0) opts->timeout *= 1000;
break;
case 'W':
if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'B':
if (sscanf( optarg, "%d", &opts->recv_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'V': enable_logging(); break;
case 'N': opts->name = optarg; break;
case 'T': opts->ca_db = optarg; break;
case 'C': opts->certificate = optarg; break;
case 'K': opts->privatekey = optarg; break;
case 'P': parse_password( optarg, &opts->password ); break;
default:
usage(1);
}
}
// default target if none specified
if (opts->targets.count == 0) addresses_add( &opts->targets, "amqp://0.0.0.0" );
}
int main(int argc, char** argv)
{
Options_t opts;
Statistics_t stats;
parse_options( argc, argv, &opts );
pn_reactor_t *reactor = pn_reactor();
pn_handler_t *sh = sender_handler(&opts, &stats);
pn_handler_add(sh, pn_handshaker());
pn_reactor_connection(reactor, sh);
pn_reactor_run(reactor);
pn_reactor_free(reactor);
pn_handler_free(sh);
addresses_free(&opts.targets);
free(opts.password);
return 0;
}
#undef PN_USE_DEPRECATED_API