blob: 95c515bed6f44e6ad3da5f614653c92ba00378f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#define PN_USE_DEPRECATED_API 1
#include "msgr-common.h"
#include "proton/message.h"
#include "proton/messenger.h"
#include "proton/error.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
typedef struct {
Addresses_t targets;
uint64_t msg_count;
uint32_t msg_size; // of body
uint32_t send_batch;
int outgoing_window;
unsigned int report_interval; // in seconds
//Addresses_t subscriptions;
//Addresses_t reply_tos;
int get_replies;
int timeout; // in seconds
int incoming_window;
int recv_count;
const char *name;
char *certificate;
char *privatekey; // used to sign certificate
char *password; // for private key file
char *ca_db; // trusted CA database
} Options_t;
static void usage(int rc)
{
printf("Usage: msgr-send [OPTIONS] \n"
" -a <addr>[,<addr>]* \tThe target address [amqp[s]://domain[/name]]\n"
" -c # \tNumber of messages to send before exiting [0=forever]\n"
" -b # \tSize of message body in bytes [1024]\n"
" -p # \tSend batches of # messages (wait for replies before sending next batch if -R) [1024]\n"
" -w # \t# outgoing window size [0]\n"
" -e # \t# seconds to report statistics, 0 = end of test [0]\n"
" -R \tWait for a reply to each sent message\n"
" -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n"
" -W # \tIncoming window size [0]\n"
" -B # \tArgument to Messenger::recv(n) [-1]\n"
" -N <name> \tSet the container name to <name>\n"
" -V \tEnable debug logging\n"
" SSL options:\n"
" -T <path> \tDatabase of trusted CA certificates for validating peer\n"
" -C <path> \tFile containing self-identifying certificate\n"
" -K <path> \tFile containing private key used to sign certificate\n"
" -P [pass:<password>|path] \tPassword to unlock private key file.\n"
);
//printf("-p \t*TODO* Add N sample properties to each message [3]\n");
exit(rc);
}
static void parse_options( int argc, char **argv, Options_t *opts )
{
int c;
opterr = 0;
memset( opts, 0, sizeof(*opts) );
opts->msg_size = 1024;
opts->send_batch = 1024;
opts->timeout = -1;
opts->recv_count = -1;
addresses_init(&opts->targets);
while ((c = getopt(argc, argv,
"a:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) {
switch(c) {
case 'a': addresses_merge( &opts->targets, optarg ); break;
case 'c':
if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'b':
if (sscanf( optarg, "%u", &opts->msg_size ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'p':
if (sscanf( optarg, "%u", &opts->send_batch ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'w':
if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'e':
if (sscanf( optarg, "%u", &opts->report_interval ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'R': opts->get_replies = 1; break;
case 't':
if (sscanf( optarg, "%d", &opts->timeout ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
if (opts->timeout > 0) opts->timeout *= 1000;
break;
case 'W':
if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'B':
if (sscanf( optarg, "%d", &opts->recv_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'V': enable_logging(); break;
case 'N': opts->name = optarg; break;
case 'T': opts->ca_db = optarg; break;
case 'C': opts->certificate = optarg; break;
case 'K': opts->privatekey = optarg; break;
case 'P': parse_password( optarg, &opts->password ); break;
default:
usage(1);
}
}
// default target if none specified
if (opts->targets.count == 0) addresses_add( &opts->targets, "amqp://0.0.0.0" );
}
// return the # of reply messages received
static int process_replies( pn_messenger_t *messenger,
pn_message_t *message,
Statistics_t *stats,
int max_count)
{
int received = 0;
LOG("Calling pn_messenger_recv(%d)\n", max_count);
int rc = pn_messenger_recv( messenger, max_count );
check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_recv() failed");
LOG("Messages on incoming queue: %d\n", pn_messenger_incoming(messenger));
while (pn_messenger_incoming(messenger)) {
pn_messenger_get(messenger, message);
check_messenger(messenger);
received++;
// TODO: header decoding?
statistics_msg_received( stats, message );
// uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong;
}
return received;
}
int main(int argc, char** argv)
{
Options_t opts;
Statistics_t stats;
uint64_t sent = 0;
uint64_t received = 0;
int target_index = 0;
int rc;
pn_message_t *message = 0;
pn_message_t *reply_message = 0;
pn_messenger_t *messenger = 0;
parse_options( argc, argv, &opts );
messenger = pn_messenger( opts.name );
if (opts.certificate) {
rc = pn_messenger_set_certificate(messenger, opts.certificate);
check( rc == 0, "Failed to set certificate" );
}
if (opts.privatekey) {
rc = pn_messenger_set_private_key(messenger, opts.privatekey);
check( rc == 0, "Failed to set private key" );
}
if (opts.password) {
rc = pn_messenger_set_password(messenger, opts.password);
free(opts.password);
check( rc == 0, "Failed to set password" );
}
if (opts.ca_db) {
rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db);
check( rc == 0, "Failed to set trusted CA database" );
}
if (opts.outgoing_window) {
pn_messenger_set_outgoing_window( messenger, opts.outgoing_window );
}
pn_messenger_set_timeout( messenger, opts.timeout );
pn_messenger_start(messenger);
message = pn_message();
check(message, "failed to allocate a message");
pn_message_set_reply_to(message, "~");
pn_data_t *body = pn_message_body(message);
char *data = (char *)calloc(1, opts.msg_size);
pn_data_put_binary(body, pn_bytes(opts.msg_size, data));
free(data);
pn_atom_t id;
id.type = PN_ULONG;
#if 0
// TODO: how do we effectively benchmark header processing overhead???
pn_data_t *props = pn_message_properties(message);
pn_data_put_map(props);
pn_data_enter(props);
//
//pn_data_put_string(props, pn_bytes(6, "string"));
//pn_data_put_string(props, pn_bytes(10, "this is awkward"));
//
//pn_data_put_string(props, pn_bytes(4, "long"));
pn_data_put_long(props, 12345);
//
//pn_data_put_string(props, pn_bytes(9, "timestamp"));
pn_data_put_timestamp(props, (pn_timestamp_t) 54321);
pn_data_exit(props);
#endif
const int get_replies = opts.get_replies;
if (get_replies) {
// disable the timeout so that pn_messenger_recv() won't block
reply_message = pn_message();
check(reply_message, "failed to allocate a message");
}
statistics_start( &stats );
while (!opts.msg_count || (sent < opts.msg_count)) {
// setup the message to send
pn_message_set_address(message, opts.targets.addresses[target_index]);
target_index = NEXT_ADDRESS(opts.targets, target_index);
id.u.as_ulong = sent;
pn_message_set_correlation_id( message, id );
pn_message_set_creation_time( message, msgr_now() );
pn_messenger_put(messenger, message);
sent++;
if (opts.send_batch && (pn_messenger_outgoing(messenger) >= (int)opts.send_batch)) {
if (get_replies) {
while (received < sent) {
// this will also transmit any pending sent messages
received += process_replies( messenger, reply_message,
&stats, opts.recv_count );
}
} else {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_send() failed");
}
}
check_messenger(messenger);
}
LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent);
if (get_replies) {
// wait for the last of the replies
while (received < sent) {
int count = process_replies( messenger, reply_message,
&stats, opts.recv_count );
check( count > 0 || (opts.timeout == 0),
"Error: timed out waiting for reply messages\n");
received += count;
LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent);
}
} else if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
check(rc == 0, "pn_messenger_send() failed");
}
rc = pn_messenger_stop(messenger);
check(rc == 0, "pn_messenger_stop() failed");
check_messenger(messenger);
statistics_report( &stats, sent, received );
pn_messenger_free(messenger);
pn_message_free(message);
if (reply_message) pn_message_free( reply_message );
addresses_free( &opts.targets );
return 0;
}
#undef PN_USE_DEPRECATED_API