blob: cbc9b456c2fd8c9ca96871fbff333e48516626d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#define PN_USE_DEPRECATED_API 1
#include "msgr-common.h"
#include "proton/message.h"
#include "proton/messenger.h"
#include "proton/error.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
typedef struct {
Addresses_t subscriptions;
uint64_t msg_count;
int recv_count;
int incoming_window;
int timeout; // seconds
unsigned int report_interval; // in seconds
int outgoing_window;
Addresses_t forwarding_targets;
int reply;
const char *name;
const char *ready_text;
char *certificate;
char *privatekey; // used to sign certificate
char *password; // for private key file
char *ca_db; // trusted CA database
} Options_t;
static void usage(int rc)
{
printf("Usage: msgr-recv [OPTIONS] \n"
" -a <addr>[,<addr>]* \tAddresses to listen on [amqp://~0.0.0.0]\n"
" -c # \tNumber of messages to receive before exiting [0=forever]\n"
" -b # \tArgument to Messenger::recv(n) [2048]\n"
" -w # \tSize for incoming window [0]\n"
" -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n"
" -e # \t# seconds to report statistics, 0 = end of test [0] *TBD*\n"
" -R \tSend reply if 'reply-to' present\n"
" -W # \t# outgoing window size [0]\n"
" -F <addr>[,<addr>]* \tAddresses used for forwarding received messages\n"
" -N <name> \tSet the container name to <name>\n"
" -X <text> \tPrint '<text>\\n' to stdout after all subscriptions are created\n"
" -V \tEnable debug logging\n"
" SSL options:\n"
" -T <path> \tDatabase of trusted CA certificates for validating peer\n"
" -C <path> \tFile containing self-identifying certificate\n"
" -K <path> \tFile containing private key used to sign certificate\n"
" -P [pass:<password>|path] \tPassword to unlock private key file.\n"
);
exit(rc);
}
static void parse_options( int argc, char **argv, Options_t *opts )
{
int c;
opterr = 0;
memset( opts, 0, sizeof(*opts) );
opts->recv_count = -1;
opts->timeout = -1;
addresses_init(&opts->subscriptions);
addresses_init(&opts->forwarding_targets);
while ((c = getopt(argc, argv,
"a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) {
switch (c) {
case 'a': addresses_merge( &opts->subscriptions, optarg ); break;
case 'c':
if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'b':
if (sscanf( optarg, "%d", &opts->recv_count ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'w':
if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 't':
if (sscanf( optarg, "%d", &opts->timeout ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
if (opts->timeout > 0) opts->timeout *= 1000;
break;
case 'e':
if (sscanf( optarg, "%u", &opts->report_interval ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'R': opts->reply = 1; break;
case 'W':
if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) {
fprintf(stderr, "Option -%c requires an integer argument.\n", optopt);
usage(1);
}
break;
case 'F': addresses_merge( &opts->forwarding_targets, optarg ); break;
case 'V': enable_logging(); break;
case 'N': opts->name = optarg; break;
case 'X': opts->ready_text = optarg; break;
case 'T': opts->ca_db = optarg; break;
case 'C': opts->certificate = optarg; break;
case 'K': opts->privatekey = optarg; break;
case 'P': parse_password( optarg, &opts->password ); break;
default:
usage(1);
}
}
// default subscription if none specified
if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions,
"amqp://~0.0.0.0" );
}
int main(int argc, char** argv)
{
Options_t opts;
Statistics_t stats;
uint64_t sent = 0;
uint64_t received = 0;
int forwarding_index = 0;
int rc;
pn_message_t *message;
pn_messenger_t *messenger;
parse_options( argc, argv, &opts );
const int forward = opts.forwarding_targets.count != 0;
message = pn_message();
messenger = pn_messenger( opts.name );
/* load the various command line options if they're set */
if (opts.certificate) {
rc = pn_messenger_set_certificate(messenger, opts.certificate);
check_messenger(messenger);
check( rc == 0, "Failed to set certificate" );
}
if (opts.privatekey) {
rc = pn_messenger_set_private_key(messenger, opts.privatekey);
check_messenger(messenger);
check( rc == 0, "Failed to set private key" );
}
if (opts.password) {
rc = pn_messenger_set_password(messenger, opts.password);
free(opts.password);
check_messenger(messenger);
check( rc == 0, "Failed to set password" );
}
if (opts.ca_db) {
rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db);
check_messenger(messenger);
check( rc == 0, "Failed to set trusted CA database" );
}
if (opts.incoming_window) {
// RAFI: seems to cause receiver to hang:
pn_messenger_set_incoming_window( messenger, opts.incoming_window );
}
pn_messenger_set_timeout( messenger, opts.timeout );
pn_messenger_start(messenger);
check_messenger(messenger);
int i;
for (i = 0; i < opts.subscriptions.count; i++) {
pn_messenger_subscribe(messenger, opts.subscriptions.addresses[i]);
check_messenger(messenger);
LOG("Subscribing to '%s'\n", opts.subscriptions.addresses[i]);
}
// hack to let test scripts know when the receivers are ready (so
// that the senders may be started)
if (opts.ready_text) {
fprintf(stdout, "%s\n", opts.ready_text);
fflush(stdout);
}
while (!opts.msg_count || received < opts.msg_count) {
LOG("Calling pn_messenger_recv(%d)\n", opts.recv_count);
rc = pn_messenger_recv(messenger, opts.recv_count);
check_messenger(messenger);
check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed");
// start the timer only after receiving the first msg
if (received == 0) statistics_start( &stats );
LOG("Messages on incoming queue: %d\n", pn_messenger_incoming(messenger));
while (pn_messenger_incoming(messenger)) {
pn_messenger_get(messenger, message);
check_messenger(messenger);
received++;
// TODO: header decoding?
// uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong;
statistics_msg_received( &stats, message );
if (opts.reply) {
const char *reply_addr = pn_message_get_reply_to( message );
if (reply_addr) {
LOG("Replying to: %s\n", reply_addr );
pn_message_set_address( message, reply_addr );
pn_message_set_creation_time( message, msgr_now() );
pn_messenger_put(messenger, message);
sent++;
}
}
if (forward) {
const char *forward_addr = opts.forwarding_targets.addresses[forwarding_index];
forwarding_index = NEXT_ADDRESS(opts.forwarding_targets, forwarding_index);
LOG("Forwarding to: %s\n", forward_addr );
pn_message_set_address( message, forward_addr );
pn_message_set_reply_to( message, NULL ); // else points to origin sender
pn_message_set_creation_time( message, msgr_now() );
pn_messenger_put(messenger, message);
sent++;
}
}
LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent);
}
// this will flush any pending sends
if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
check_messenger(messenger);
check(rc == 0, "pn_messenger_send() failed");
}
rc = pn_messenger_stop(messenger);
check(rc == 0, "pn_messenger_stop() failed");
check_messenger(messenger);
statistics_report( &stats, sent, received );
pn_messenger_free(messenger);
pn_message_free(message);
addresses_free( &opts.subscriptions );
addresses_free( &opts.forwarding_targets );
return 0;
}
#undef PN_USE_DEPRECATED_API