blob: a60a8979405de04a8b8ca669a5120dc9a677cece [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*
* A test traffic generator that produces very long messages that are sent in
* chunks with a delay between each chunk. This client can be used to simulate
* very large streaming messages and/or slow producers.
*/
#include "proton/connection.h"
#include "proton/delivery.h"
#include "proton/link.h"
#include "proton/proactor.h"
#include "proton/session.h"
#include "proton/transport.h"
#include <arpa/inet.h>
#include <inttypes.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define DEFAULT_MAX_FRAME 65535
#define BOOL2STR(b) ((b)?"true":"false")
#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
bool stop = false;
bool verbose = false;
uint64_t limit = 1; // # messages to send
uint64_t sent = 0; // # sent
uint64_t acked = 0; // # of received acks
uint64_t accepted = 0;
uint64_t not_accepted = 0;
bool use_anonymous = false; // use anonymous link if true
bool presettle = false; // true = send presettled
uint32_t body_length = 1024 * 1024; // # bytes in vbin32 payload
uint32_t pause_msec = 100; // pause between sending chunks (milliseconds)
const char *target_address = "test-address";
const char *host_address = "127.0.0.1:5672";
const char *container_name = "Clogger";
char proactor_address[1024];
//
pn_proactor_t *proactor;
pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
pn_delivery_t *pn_dlv; // current in-flight delivery
uint32_t bytes_sent; // number of body data bytes written out link
uint32_t remote_max_frame = DEFAULT_MAX_FRAME; // used to limit amount written
#define AMQP_MSG_HEADER 0x70
#define AMQP_MSG_PROPERTIES 0x73
#define AMQP_MSG_DATA 0x75
// minimal AMQP header for a message that contains a single binary value
//
const uint8_t msg_header[] = {
0x00, // begin described type
0x53, // 1 byte ulong type
0x70, // HEADER section
0x45, // empty list
0x00, // begin described type
0x53, // 1 byte ulong type
0x73, // PROPERTIES section
0x45, // empty list
0x00, // begin described type
0x53, // 1 byte ulong type
0x77, // AMQP Value BODY section
0xb0, // Binary uint32 length
// 4 bytes for length here
// start of data...
};
void debug(const char *format, ...)
{
va_list args;
if (!verbose) return;
va_start(args, format);
vprintf(format, args);
va_end(args);
}
static void signal_handler(int signum)
{
signal(SIGINT, SIG_IGN);
signal(SIGQUIT, SIG_IGN);
switch (signum) {
case SIGINT:
case SIGQUIT:
stop = true;
if (proactor) pn_proactor_interrupt(proactor);
break;
default:
break;
}
}
void start_message()
{
static long tag = 0; // a simple tag generator
if (!pn_link || !pn_conn) return;
if (pn_dlv) {
debug("Cannot create delivery - in process\n");
abort();
}
debug("start message #%"PRIu64"!\n", sent);
pn_dlv = pn_delivery(pn_link, pn_dtag((const char *)&tag, sizeof(tag)));
++tag;
bytes_sent = 0;
// send the message header
ssize_t rc = pn_link_send(pn_link, (const char *)msg_header, sizeof(msg_header));
if (rc != sizeof(msg_header)) {
debug("Link send failed error=%ld\n", rc);
abort();
}
// add the vbin32 length (in network order!!!)
uint32_t len = htonl(body_length);
rc = pn_link_send(pn_link, (const char *)&len, sizeof(len));
if (rc != sizeof(len)) {
debug("Link send failed error=%ld\n", rc);
abort();
}
}
/* return true when message transmit is complete */
bool send_message_data()
{
static const char zero_block[DEFAULT_MAX_FRAME] = {0};
if (!pn_dlv) return true; // not sending
if (bytes_sent < body_length) {
uint32_t amount = MIN(body_length - bytes_sent, remote_max_frame);
amount = MIN(amount, sizeof(zero_block));
ssize_t rc = pn_link_send(pn_link, zero_block, amount);
if (rc < 0) {
debug("Link send failed error=%ld\n", rc);
abort();
}
bytes_sent += rc;
debug("message body bytes written=%zi total=%"PRIu32" body_length=%"PRIu32"\n",
rc, bytes_sent, body_length);
}
if (bytes_sent == body_length) {
debug("message #%"PRIu64" sent!\n", sent);
pn_link_advance(pn_link);
sent += 1;
if (presettle) {
pn_delivery_settle(pn_dlv);
if (limit && sent == limit) {
// no need to wait for acks
debug("stopping (presettled)...\n");
stop = true;
pn_proactor_interrupt(proactor);
}
}
pn_dlv = 0;
return true;
}
return false;
}
/* Process each event posted by the proactor.
Return true if client has stopped.
*/
static bool event_handler(pn_event_t *event)
{
const pn_event_type_t etype = pn_event_type(event);
debug("new event=%s\n", pn_event_type_name(etype));
switch (etype) {
case PN_CONNECTION_INIT: {
// Create and open all the endpoints needed to send a message
//
pn_connection_open(pn_conn);
pn_ssn = pn_session(pn_conn);
pn_session_open(pn_ssn);
pn_link = pn_sender(pn_ssn, "MyClogger");
if (!use_anonymous) {
pn_terminus_set_address(pn_link_target(pn_link), target_address);
}
pn_link_open(pn_link);
} break;
case PN_CONNECTION_REMOTE_OPEN: {
uint32_t rmf = pn_transport_get_remote_max_frame(pn_event_transport(event));
remote_max_frame = (rmf != 0) ? rmf : DEFAULT_MAX_FRAME;
debug("Remote MAX FRAME=%u\n", remote_max_frame);
} break;
case PN_LINK_FLOW: {
// the remote has given us some credit, now we can send messages
//
if (limit == 0 || sent < limit) {
if (pn_link_credit(pn_link) > 0) {
if (!pn_dlv) {
start_message();
pn_proactor_set_timeout(proactor, pause_msec); // send body after pause
}
}
}
} break;
case PN_TRANSPORT: {
ssize_t pending = pn_transport_pending(pn_event_transport(event));
debug("PN_TRANSPORT pending=%ld\n", pending);
} break;
case PN_DELIVERY: {
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_delivery_updated(dlv)) {
uint64_t rs = pn_delivery_remote_state(dlv);
pn_delivery_clear(dlv);
switch (rs) {
case PN_RECEIVED:
debug("PN_DELIVERY: received\n");
// This is not a terminal state - it is informational, and the
// peer is still processing the message.
break;
case PN_ACCEPTED:
debug("PN_DELIVERY: accept\n");
++acked;
++accepted;
pn_delivery_settle(dlv);
break;
case PN_REJECTED:
case PN_RELEASED:
case PN_MODIFIED:
default:
++acked;
++not_accepted;
pn_delivery_settle(dlv);
debug("Message not accepted - code: 0x%lX\n", (unsigned long)rs);
break;
}
if (limit && acked == limit) {
// initiate clean shutdown of the endpoints
debug("stopping...\n");
stop = true;
pn_proactor_interrupt(proactor);
}
}
} break;
case PN_PROACTOR_TIMEOUT: {
if (pn_conn) pn_connection_wake(pn_conn);
} break;
case PN_CONNECTION_WAKE: {
if (!send_message_data()) { // not done sending
pn_proactor_set_timeout(proactor, pause_msec);
} else if (limit == 0 || sent < limit) {
if (pn_link_credit(pn_link) > 0) {
// send next message
start_message();
pn_proactor_set_timeout(proactor, pause_msec);
}
}
} break;
case PN_PROACTOR_INACTIVE: {
debug("proactor inactive!\n");
return stop;
} break;
default:
break;
}
return false;
}
static void usage(const char *prog)
{
printf("Usage: %s <options>\n", prog);
printf("-a \tThe host address [%s]\n", host_address);
printf("-c \t# of messages to send, 0 == nonstop [%"PRIu64"]\n", limit);
printf("-i \tContainer name [%s]\n", container_name);
printf("-n \tUse an anonymous link [%s]\n", BOOL2STR(use_anonymous));
printf("-s \tBody size in bytes [%d]\n", body_length);
printf("-t \tTarget address [%s]\n", target_address);
printf("-u \tSend all messages presettled [%s]\n", BOOL2STR(presettle));
printf("-D \tPrint debug info [off]\n");
printf("-P \tPause between sending frames [%"PRIu32"]\n", pause_msec);
exit(1);
}
int main(int argc, char** argv)
{
/* command line options */
opterr = 0;
int c;
while ((c = getopt(argc, argv, "ha:c:i:ns:t:uDP:")) != -1) {
switch(c) {
case 'h': usage(argv[0]); break;
case 'a': host_address = optarg; break;
case 'c':
if (sscanf(optarg, "%"SCNu64, &limit) != 1)
usage(argv[0]);
break;
case 'i': container_name = optarg; break;
case 'n': use_anonymous = true; break;
case 's':
if (sscanf(optarg, "%"SCNu32, &body_length) != 1)
usage(argv[0]);
break;
case 't': target_address = optarg; break;
case 'u': presettle = true; break;
case 'D': verbose = true; break;
case 'P':
if (sscanf(optarg, "%"SCNu32, &pause_msec) != 1)
usage(argv[0]);
break;
default:
usage(argv[0]);
break;
}
}
signal(SIGQUIT, signal_handler);
signal(SIGINT, signal_handler);
// test infrastructure may add a "amqp[s]://" prefix to the address string.
// That causes proactor much grief, so strip it off
if (strncmp("amqps://", host_address, strlen("amqps://")) == 0) {
host_address += strlen("amqps://"); // no! no ssl for you!
} else if (strncmp("amqp://", host_address, strlen("amqp://")) == 0) {
host_address += strlen("amqp://");
}
// trim port from hostname
char *hostname = strdup(host_address);
char *port = strchr(hostname, ':');
if (port) {
*port++ = 0;
} else {
port = "5672";
}
pn_conn = pn_connection();
// the container name should be unique for each client
pn_connection_set_container(pn_conn, container_name);
pn_connection_set_hostname(pn_conn, hostname);
proactor = pn_proactor();
pn_proactor_addr(proactor_address, sizeof(proactor_address), hostname, port);
pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
free(hostname);
bool done = false;
while (!done) {
debug("Waiting for proactor event...\n");
pn_event_batch_t *events = pn_proactor_wait(proactor);
debug("Start new proactor batch\n");
pn_event_t *event = pn_event_batch_next(events);
while (event) {
done = event_handler(event);
if (done)
break;
event = pn_event_batch_next(events);
}
debug("Proactor batch processing done\n");
pn_proactor_done(proactor, events);
if (stop && pn_conn) {
debug("Stop detected - closing connection...\n");
if (pn_link) pn_link_close(pn_link);
if (pn_ssn) pn_session_close(pn_ssn);
pn_connection_close(pn_conn);
pn_link = 0;
pn_ssn = 0;
pn_conn = 0;
}
}
debug("Send complete!\n");
pn_proactor_free(proactor);
if (not_accepted) {
printf("Sent: %" PRIu64 " Accepted: %" PRIu64 " Not Accepted: %" PRIu64 "\n", sent, accepted, not_accepted);
if (accepted + not_accepted != sent) {
printf("FAILURE! Sent: %" PRIu64 " Acked: %" PRIu64 "\n", sent, accepted + not_accepted);
return 1;
}
}
return 0;
}