blob: f4518bf8620c82a6fb48ee3d7a666b369c1dd77b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/connection.h>
#include <proton/condition.h>
#include <proton/delivery.h>
#include <proton/link.h>
#include <proton/message.h>
#include <proton/proactor.h>
#include <proton/session.h>
#include <proton/sasl.h>
#include <proton/ssl.h>
#include <proton/transport.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
const char *container_id;
const char *user;
const char *pass;
int message_count;
pn_proactor_t *proactor;
pn_rwbytes_t message_buffer;
int sent;
int acknowledged;
} app_data_t;
static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
pn_connection_close(pn_event_connection(e));
exit_code = 1;
}
}
/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
static pn_bytes_t encode_message(app_data_t* app) {
/* Construct a message with the map { "sequence": app.sent } */
pn_message_t* message = pn_message();
pn_data_t* body = pn_message_body(message);
pn_data_put_ulong(pn_message_id(message), app->sent); /* Set the message_id also */
pn_data_put_map(body);
pn_data_enter(body);
pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
pn_data_put_int(body, app->sent); /* The sequence number */
pn_data_exit(body);
/* encode the message, expanding the encode buffer as needed */
if (app->message_buffer.start == NULL) {
static const size_t initial_size = 128;
app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
}
/* app->message_buffer is the total buffer space available. */
/* mbuf wil point at just the portion used by the encoded message */
{
pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
int status = 0;
while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
app->message_buffer.size *= 2;
app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
mbuf.size = app->message_buffer.size;
mbuf.start = app->message_buffer.start;
}
if (status != 0) {
fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
exit(1);
}
pn_message_free(message);
return pn_bytes(mbuf.size, mbuf.start);
}
}
/* Returns true to continue, false if finished */
static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
pn_connection_t* c = pn_event_connection(event);
pn_session_t* s = pn_session(pn_event_connection(event));
pn_connection_set_container(c, app->container_id);
pn_connection_set_hostname(c, app->host);
pn_connection_set_user(c, app->user);
pn_connection_set_password(c, app->pass);
pn_connection_open(c);
pn_session_open(s);
{
pn_link_t* l = pn_sender(s, "my_sender");
pn_terminus_set_address(pn_link_target(l), app->amqp_address);
pn_link_open(l);
break;
}
}
case PN_CONNECTION_REMOTE_OPEN: {
pn_ssl_t *ssl = pn_ssl(pn_event_transport(event));
if (ssl) {
char name[1024];
pn_ssl_get_protocol_name(ssl, name, sizeof(name));
{
const char *subject = pn_ssl_get_remote_subject(ssl);
if (subject) {
printf("secure connection: to %s using %s\n", subject, name);
} else {
printf("anonymous connection: using %s\n", name);
}
fflush(stdout);
}
}
break;
}
case PN_LINK_FLOW: {
/* The peer has given us some credit, now we can send messages */
pn_link_t *sender = pn_event_link(event);
while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
++app->sent;
/* Use sent counter as unique delivery tag. */
pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
{
pn_bytes_t msgbuf = encode_message(app);
pn_link_send(sender, msgbuf.start, msgbuf.size);
}
pn_link_advance(sender);
}
break;
}
case PN_DELIVERY: {
/* We received acknowledgement from the peer that a message was delivered. */
pn_delivery_t* d = pn_event_delivery(event);
if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
fflush(stdout);
pn_connection_close(pn_event_connection(event));
/* Continue handling events till we receive TRANSPORT_CLOSED */
}
} else {
fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
pn_connection_close(pn_event_connection(event));
exit_code=1;
}
break;
}
case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
break;
case PN_CONNECTION_REMOTE_CLOSE:
check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_SESSION_REMOTE_CLOSE:
check_condition(event, pn_session_remote_condition(pn_event_session(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_LINK_REMOTE_CLOSE:
case PN_LINK_REMOTE_DETACH:
check_condition(event, pn_link_remote_condition(pn_event_link(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_PROACTOR_INACTIVE:
return false;
default: break;
}
return true;
}
void run(app_data_t *app) {
/* Loop and handle events */
do {
pn_event_batch_t *events = pn_proactor_wait(app->proactor);
pn_event_t *e;
for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
if (!handle(app, e)) {
return;
}
}
pn_proactor_done(app->proactor, events);
} while(true);
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
char addr[PN_MAX_ADDR];
pn_transport_t *t = NULL;
int err;
app.container_id = argv[0]; /* Should be unique */
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
app.amqp_address = (argc > 3) ? argv[3] : "examples";
app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
app.user = (argc > 5) ? argv[5] : 0 ;
app.pass = (argc > 6) ? argv[6] : 0 ;
app.proactor = pn_proactor();
pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
/* Configure a transport for SSL. The transport will be freed by the proactor. */
t = pn_transport();
/* If we got a username/password on the command line set up for authentication, else allow anonymous ssl */
if (app.user && app.pass) {
err = pn_ssl_init(pn_ssl(t), NULL, NULL);
pn_sasl_allowed_mechs(pn_sasl(t), "PLAIN");
} else {
pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
err = pn_ssl_domain_set_peer_authentication(domain, PN_SSL_ANONYMOUS_PEER, NULL);
if (!err) {
err = pn_ssl_init(pn_ssl(t), domain, NULL);
}
pn_ssl_domain_free(domain);
}
if (err) {
fprintf(stderr, "error initializing SSL: %s\n", pn_code(err));
return 1;
}
pn_proactor_connect2(app.proactor, NULL, t, addr);
run(&app);
pn_proactor_free(app.proactor);
free(app.message_buffer.start);
return exit_code;
}