blob: 6c3cdb3d51a805064a4195ab4f3537f3d407b63b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "pncompat/misc_funcs.inc"
#include "proton/reactor.h"
#include "proton/message.h"
#include "proton/connection.h"
#include "proton/session.h"
#include "proton/link.h"
#include "proton/delivery.h"
#include "proton/event.h"
#include "proton/handlers.h"
#include "proton/transport.h"
#include "proton/url.h"
static int quiet = 0;
// Example application data. This data will be instantiated in the event
// handler, and is available during event processing. In this example it
// holds configuration and state information.
//
typedef struct {
int count; // # messages to send
int anon; // use anonymous link if true
const char *target; // name of destination target
char *msg_data; // pre-encoded outbound message
size_t msg_len; // bytes in msg_data
} app_data_t;
// helper to pull pointer to app_data_t instance out of the pn_handler_t
//
#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
// Called when reactor exits to clean up app_data
//
static void delete_handler(pn_handler_t *handler)
{
app_data_t *d = GET_APP_DATA(handler);
if (d->msg_data) {
free(d->msg_data);
d->msg_data = NULL;
}
}
/* Process each event posted by the reactor.
*/
static void event_handler(pn_handler_t *handler,
pn_event_t *event,
pn_event_type_t type)
{
app_data_t *data = GET_APP_DATA(handler);
switch (type) {
case PN_CONNECTION_INIT: {
// Create and open all the endpoints needed to send a message
//
pn_connection_t *conn;
pn_session_t *ssn;
pn_link_t *sender;
conn = pn_event_connection(event);
pn_connection_open(conn);
ssn = pn_session(conn);
pn_session_open(ssn);
sender = pn_sender(ssn, "MySender");
// we do not wait for ack until the last message
pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
if (!data->anon) {
pn_terminus_set_address(pn_link_target(sender), data->target);
}
pn_link_open(sender);
} break;
case PN_LINK_FLOW: {
// the remote has given us some credit, now we can send messages
//
static long tag = 0; // a simple tag generator
pn_delivery_t *delivery;
pn_link_t *sender = pn_event_link(event);
int credit = pn_link_credit(sender);
while (credit > 0 && data->count > 0) {
--credit;
--data->count;
++tag;
delivery = pn_delivery(sender,
pn_dtag((const char *)&tag, sizeof(tag)));
pn_link_send(sender, data->msg_data, data->msg_len);
pn_link_advance(sender);
if (data->count > 0) {
// send pre-settled until the last one, then wait for an ack on
// the last sent message. This allows the sender to send
// messages as fast as possible and then exit when the consumer
// has dealt with the last one.
//
pn_delivery_settle(delivery);
}
}
} break;
case PN_DELIVERY: {
// Since the example sends all messages but the last pre-settled
// (pre-acked), only the last message's delivery will get updated with
// the remote state (acked/nacked).
//
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) {
uint64_t rs = pn_delivery_remote_state(dlv);
int done = 1;
switch (rs) {
case PN_RECEIVED:
// This is not a terminal state - it is informational, and the
// peer is still processing the message.
done = 0;
break;
case PN_ACCEPTED:
pn_delivery_settle(dlv);
if (!quiet) fprintf(stdout, "Send complete!\n");
break;
case PN_REJECTED:
case PN_RELEASED:
case PN_MODIFIED:
pn_delivery_settle(dlv);
fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs);
break;
default:
// ??? no other terminal states defined, so ignore anything else
pn_delivery_settle(dlv);
fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs);
break;
}
if (done) {
// initiate clean shutdown of the endpoints
pn_link_t *link = pn_delivery_link(dlv);
pn_session_t *ssn = pn_link_session(link);
pn_link_close(link);
pn_session_close(ssn);
pn_connection_close(pn_session_connection(ssn));
}
}
} break;
case PN_TRANSPORT_ERROR: {
// The connection to the peer failed.
//
pn_transport_t *tport = pn_event_transport(event);
pn_condition_t *cond = pn_transport_condition(tport);
fprintf(stderr, "Network transport failed!\n");
if (pn_condition_is_set(cond)) {
const char *name = pn_condition_get_name(cond);
const char *desc = pn_condition_get_description(cond);
fprintf(stderr, " Error: %s Description: %s\n",
(name) ? name : "<error name not provided>",
(desc) ? desc : "<no description provided>");
}
// pn_reactor_process() will exit with a false return value, stopping
// the main loop.
} break;
default:
break;
}
}
static void usage(void)
{
printf("Usage: send <options> <message>\n");
printf("-a \tThe host address [localhost:5672]\n");
printf("-c \t# of messages to send [1]\n");
printf("-t \tTarget address [examples]\n");
printf("-n \tUse an anonymous link [off]\n");
printf("-i \tContainer name [SendExample]\n");
printf("-q \tQuiet - turn off stdout\n");
printf("message \tA text string to send.\n");
exit(1);
}
int main(int argc, char** argv)
{
const char *address = "localhost";
const char *msgtext = "Hello World!";
const char *container = "SendExample";
int c;
pn_message_t *message = NULL;
pn_data_t *body = NULL;
pn_reactor_t *reactor = NULL;
pn_url_t *url = NULL;
pn_connection_t *conn = NULL;
/* Create a handler for the connection's events. event_handler() will be
* called for each event and delete_handler will be called when the
* connection is released. The handler will allocate an app_data_t
* instance which can be accessed when the event_handler is called.
*/
pn_handler_t *handler = pn_handler_new(event_handler,
sizeof(app_data_t),
delete_handler);
/* set up the application data with defaults */
app_data_t *app_data = GET_APP_DATA(handler);
memset(app_data, 0, sizeof(app_data_t));
app_data->count = 1;
app_data->target = "examples";
/* Attach the pn_handshaker() handler. This handler deals with endpoint
* events from the peer so we don't have to.
*/
{
pn_handler_t *handshaker = pn_handshaker();
pn_handler_add(handler, handshaker);
pn_decref(handshaker);
}
/* command line options */
opterr = 0;
while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) {
switch(c) {
case 'h': usage(); break;
case 'a': address = optarg; break;
case 'c':
app_data->count = atoi(optarg);
if (app_data->count < 1) usage();
break;
case 't': app_data->target = optarg; break;
case 'n': app_data->anon = 1; break;
case 'i': container = optarg; break;
case 'q': quiet = 1; break;
default:
usage();
break;
}
}
if (optind < argc) msgtext = argv[optind];
// create a single message and pre-encode it so we only have to do that
// once. All transmits will use the same pre-encoded message simply for
// speed.
//
message = pn_message();
pn_message_set_address(message, app_data->target);
body = pn_message_body(message);
pn_data_clear(body);
// This message's body contains a single string
if (pn_data_fill(body, "S", msgtext)) {
fprintf(stderr, "Error building message!\n");
exit(1);
}
pn_data_rewind(body);
{
// encode the message, expanding the encode buffer as needed
//
size_t len = 128;
char *buf = (char *)malloc(len);
int rc = 0;
do {
rc = pn_message_encode(message, buf, &len);
if (rc == PN_OVERFLOW) {
free(buf);
len *= 2;
buf = (char *)malloc(len);
}
} while (rc == PN_OVERFLOW);
app_data->msg_len = len;
app_data->msg_data = buf;
}
pn_decref(message); // message no longer needed
reactor = pn_reactor();
url = pn_url_parse(address);
if (url == NULL) {
fprintf(stderr, "Invalid host address %s\n", address);
exit(1);
}
conn = pn_reactor_connection_to_host(reactor,
pn_url_get_host(url),
pn_url_get_port(url),
handler);
pn_decref(url);
pn_decref(handler);
// the container name should be unique for each client
pn_connection_set_container(conn, container);
// wait up to 5 seconds for activity before returning from
// pn_reactor_process()
pn_reactor_set_timeout(reactor, 5000);
pn_reactor_start(reactor);
while (pn_reactor_process(reactor)) {
/* Returns 'true' until the connection is shut down.
* pn_reactor_process() will return true at least once every 5 seconds
* (due to the timeout). If no timeout was configured,
* pn_reactor_process() returns as soon as it finishes processing all
* pending I/O and events. Once the connection has closed,
* pn_reactor_process() will return false.
*/
}
pn_decref(reactor);
return 0;
}