blob: 2ad0edbf32e352a4c3d3aaa3ba8521ce7f97debe [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*################################################################
This program is half of a pair. Precv and Psend are meant to
be simple-as-possible examples of how to use the proton-c
engine interface to send and receive messages over a single
connection and a single session.
In addition to being examples, these programs or their
descendants will be used in performance regression testing
for both throughput and latency, and long-term soak testing.
This program, psend, is highly similar to its peer precv.
I put all the good comments in precv.
*################################################################*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <time.h>
#include <sys/time.h>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <proton/connection.h>
#include <proton/delivery.h>
#include <proton/driver.h>
#include <proton/event.h>
#include <proton/terminus.h>
#include <proton/link.h>
#include <proton/message.h>
#include <proton/session.h>
#define MY_BUF_SIZE 1000
void
print_timestamp ( FILE * fp, char const * label )
{
struct timeval tv;
struct tm * timeinfo;
gettimeofday ( & tv, 0 );
timeinfo = localtime ( & tv.tv_sec );
int seconds_today = 3600 * timeinfo->tm_hour +
60 * timeinfo->tm_min +
timeinfo->tm_sec;
fprintf ( fp, "time : %d.%.6ld : %s\n", seconds_today, tv.tv_usec, label );
}
static
double
get_time ( )
{
struct timeval tv;
struct tm * timeinfo;
gettimeofday ( & tv, 0 );
timeinfo = localtime ( & tv.tv_sec );
double time_now = 3600 * timeinfo->tm_hour +
60 * timeinfo->tm_min +
timeinfo->tm_sec;
time_now += ((double)(tv.tv_usec) / 1000000.0);
return time_now;
}
int
main ( int argc, char ** argv )
{
char addr [ 1000 ];
char host [ 1000 ];
char port [ 1000 ];
char output_file_name[1000];
uint64_t messages = 2000000,
delivery_count = 0;
int message_length = 100;
bool done = false;
int sent_count = 0;
int n_links = 5;
int const max_links = 100;
strcpy ( addr, "queue" );
strcpy ( host, "0.0.0.0" );
strcpy ( port, "5672" );
FILE * output_fp;
for ( int i = 1; i < argc; ++ i )
{
if ( ! strcmp ( "--host", argv[i] ) )
{
strcpy ( host, argv[i+1] );
++ i;
}
else
if ( ! strcmp ( "--port", argv[i] ) )
{
strcpy ( port, argv[i+1] );
++ i;
}
else
if ( ! strcmp ( "--messages", argv[i] ) )
{
sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
++ i;
}
else
if ( ! strcmp ( "--message_length", argv[i] ) )
{
sscanf ( argv [ i+1 ], "%d", & message_length );
++ i;
}
else
if ( ! strcmp ( "--n_links", argv[i] ) )
{
sscanf ( argv [ i+1 ], "%d", & n_links );
++ i;
}
if ( ! strcmp ( "--output", argv[i] ) )
{
if ( ! strcmp ( "stderr", argv[i+1] ) )
{
output_fp = stderr;
strcpy ( output_file_name, "stderr");
}
else
if ( ! strcmp ( "stdout", argv[i+1] ) )
{
output_fp = stdout;
strcpy ( output_file_name, "stdout");
}
else
{
output_fp = fopen ( argv[i+1], "w" );
strcpy ( output_file_name, argv[i+1] );
if ( ! output_fp )
{
fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
exit ( 1 );
}
}
++ i;
}
else
{
fprintf ( output_fp, "unknown arg %s", argv[i] );
}
}
fprintf ( output_fp, "host %s\n", host );
fprintf ( output_fp, "port %s\n", port );
fprintf ( output_fp, "messages %" PRId64 "\n", messages );
fprintf ( output_fp, "message_length %d\n", message_length );
fprintf ( output_fp, "n_links %d\n", n_links );
fprintf ( output_fp, "output %s\n", output_file_name );
if ( n_links > max_links )
{
fprintf ( output_fp, "You can't have more than %d links.\n", max_links );
exit ( 1 );
}
pn_driver_t * driver;
pn_connector_t * connector;
pn_connector_t * driver_connector;
pn_connection_t * connection;
pn_collector_t * collector;
pn_link_t * links [ max_links ];
pn_session_t * session;
pn_event_t * event;
pn_delivery_t * delivery;
char * message = (char *) malloc(message_length);
memset ( message, 13, message_length );
/*----------------------------------------------------
Get everything set up.
We will have a single connector, a single
connection, a single session, and a single link.
----------------------------------------------------*/
driver = pn_driver ( );
connector = pn_connector ( driver, host, port, 0 );
connection = pn_connection();
collector = pn_collector ( );
pn_connection_collect ( connection, collector );
pn_connector_set_connection ( connector, connection );
session = pn_session ( connection );
pn_connection_open ( connection );
pn_session_open ( session );
for ( int i = 0; i < n_links; ++ i )
{
char name[100];
sprintf ( name, "tvc_15_%d", i );
links[i] = pn_sender ( session, name );
pn_terminus_set_address ( pn_link_target(links[i]), addr );
pn_link_open ( links[i] );
}
/*-----------------------------------------------------------
For my speed tests, I do not want to count setup time.
Start timing here. The receiver will print out a similar
timestamp when he receives the final message.
-----------------------------------------------------------*/
fprintf ( output_fp, "psend: sending %llu messages.\n", messages );
// Just before we start sending, print the start timestamp.
fprintf ( output_fp, "psend_start %.3lf\n", get_time() );
while ( 1 )
{
pn_driver_wait ( driver, -1 );
int event_count = 1;
while ( event_count > 0 )
{
event_count = 0;
pn_connector_process ( connector );
event = pn_collector_peek(collector);
while ( event )
{
++ event_count;
pn_event_type_t event_type = pn_event_type ( event );
//fprintf ( output_fp, "event: %s\n", pn_event_type_name ( event_type ) );
switch ( event_type )
{
case PN_LINK_FLOW:
{
if ( delivery_count < messages )
{
/*---------------------------------------------------
We may have opened multiple links.
The event will tell us which one this flow-event
happened on. If the flow event gave us some
credit, we will greedily send messages until it
is all used up.
---------------------------------------------------*/
pn_link_t * link = pn_event_link ( event );
int credit = pn_link_credit ( link );
while ( credit > 0 )
{
// Every delivery we create needs a unique tag.
char str [ 100 ];
sprintf ( str, "%x", delivery_count ++ );
delivery = pn_delivery ( link, pn_dtag(str, strlen(str)) );
// If you settle the delivery before sending it,
// you will spend some time wondering why your
// messages don't have any content when they arrive
// at the receiver.
pn_link_send ( link, message, message_length );
pn_delivery_settle ( delivery );
pn_link_advance ( link );
credit = pn_link_credit ( link );
}
if ( delivery_count >= messages )
{
fprintf ( output_fp,
"I have sent all %d messages.\n" ,
delivery_count
);
/*
I'm still kind of hazy on how to shut down the
psend / precv system properly ....
I can't go to all_done here, or precv will never
get all its messages and terminate.
So I let precv terminate properly ... which means
that this program, psend, dies with an error.
Hmm.
*/
// goto all_done;
}
}
}
break;
case PN_TRANSPORT:
// I don't know what this means here, either.
break;
case PN_TRANSPORT_TAIL_CLOSED:
goto all_done;
break;
default:
/*
fprintf ( output_fp,
"precv unhandled event: %s\n",
pn_event_type_name(event_type)
);
*/
break;
}
pn_collector_pop ( collector );
event = pn_collector_peek(collector);
}
}
}
all_done:
for ( int i = 0; i < n_links; ++ i )
{
pn_link_close ( links[i] );
}
pn_session_close ( session );
pn_connection_close ( connection );
return 0;
}