blob: e2fdad16efeff689e88b921125bea4bea986a421 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <wait.h>
#include <benchmark/benchmark.h>
#include "proton/connection_driver.h"
#include "proton/engine.h"
#include "proton/log.h"
#include "proton/message.h"
#include "proton/transport.h"
#include <proton/listener.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <proton/sasl.h>
// variant of the receive.c proactor example
// borrowed things from the quiver arrow, too
#define MAX_SIZE 1024
typedef char str[MAX_SIZE];
typedef struct app_data_t {
str container_id;
pn_rwbytes_t message_buffer{};
int message_count;
int received = 0;
pn_message_t *message;
int sent = 0;
pn_rwbytes_t msgout;
int credit_window = 5000;
int acknowledged = 0;
int closed = 0;
} app_data_t;
static void decode_message(pn_delivery_t *dlv);
static void check_condition(pn_event_t *e, pn_condition_t *cond);
static void shovel(pn_connection_driver_t &sender, pn_connection_driver_t &receiver);
static void handle_receiver(app_data_t *app, pn_event_t *event);
static void handle_sender(app_data_t *app, pn_event_t *event);
const bool VERBOSE = false;
const bool ERRORS = true;
// useful for debugging
static void turn_on_transport_logging(pn_transport_t& transport) {
pn_log_enable(true);
pn_transport_trace(&transport,
PN_TRACE_FRM | PN_TRACE_EVT);
}
static void BM_InitCloseSender(benchmark::State &state) {
if (VERBOSE)
printf("BEGIN BM_InitCloseSender\n");
for (auto _ : state) {
pn_connection_driver_t sender;
if (pn_connection_driver_init(&sender, NULL, NULL) != 0) {
printf("sender: pn_connection_driver_init failed\n");
exit(1);
}
pn_connection_driver_close(&sender);
pn_connection_driver_destroy(&sender);
}
if (VERBOSE)
printf("END BM_InitCloseSender\n");
}
BENCHMARK(BM_InitCloseSender)->Unit(benchmark::kMicrosecond);
static void BM_InitCloseReceiver(benchmark::State &state) {
if (VERBOSE)
printf("BEGIN BM_InitCloseReceiver\n");
for (auto _ : state) {
pn_connection_driver_t receiver;
if (pn_connection_driver_init(&receiver, NULL, NULL) != 0) {
printf("receiver: pn_connection_driver_init failed\n");
exit(1);
}
pn_connection_driver_close(&receiver);
pn_connection_driver_destroy(&receiver);
}
if (VERBOSE)
printf("END BM_InitCloseReceiver\n");
}
BENCHMARK(BM_InitCloseReceiver)->Unit(benchmark::kMicrosecond);
// sends single message and closes the connection
static void BM_EstablishConnection(benchmark::State &state) {
if (VERBOSE)
printf("BEGIN BM_EstablishConnection\n");
for (auto _ : state) {
app_data_t app = {};
app.message_count = 1;
app.message = pn_message();
sprintf(app.container_id, "%s:%06x", "BM_EstablishConnection",
rand() & 0xffffff);
pn_connection_driver_t receiver;
if (pn_connection_driver_init(&receiver, NULL, NULL) != 0) {
printf("receiver: pn_connection_driver_init failed\n");
exit(1);
}
pn_connection_driver_t sender;
if (pn_connection_driver_init(&sender, NULL, NULL) != 0) {
printf("sender: pn_connection_driver_init failed\n");
exit(1);
}
do {
pn_event_t *event;
while ((event = pn_connection_driver_next_event(&sender)) != NULL) {
handle_sender(&app, event);
}
shovel(sender, receiver);
while ((event = pn_connection_driver_next_event(&receiver)) != NULL) {
handle_receiver(&app, event);
}
shovel(receiver, sender);
} while (app.closed < 2); // until both sender and receiver are closed
pn_message_free(app.message);
pn_connection_driver_close(&receiver);
pn_connection_driver_close(&sender);
shovel(receiver, sender);
shovel(sender, receiver);
// this can take long time, up to 500 ms
pn_connection_driver_destroy(&receiver);
pn_connection_driver_destroy(&sender);
}
state.SetLabel("connections");
state.SetItemsProcessed(state.iterations());
if (VERBOSE)
printf("END BM_EstablishConnection\n");
}
BENCHMARK(BM_EstablishConnection)->Unit(benchmark::kMicrosecond);
static void BM_SendReceiveMessages(benchmark::State &state) {
if (VERBOSE)
printf("BEGIN BM_SendReceiveMessages\n");
app_data_t app = {};
app.message_count = -1; // unlimited
app.message = pn_message();
app.credit_window = state.range(0);
sprintf(app.container_id, "%s:%06x", "BM_SendReceiveMessages",
rand() & 0xffffff);
pn_connection_driver_t receiver;
if (pn_connection_driver_init(&receiver, NULL, NULL) != 0) {
printf("receiver: pn_connection_driver_init failed\n");
exit(1);
}
pn_connection_driver_t sender;
if (pn_connection_driver_init(&sender, NULL, NULL) != 0) {
printf("sender: pn_connection_driver_init failed\n");
exit(1);
}
for (auto _ : state) {
pn_event_t *event;
while ((event = pn_connection_driver_next_event(&sender)) != NULL) {
handle_sender(&app, event);
}
shovel(sender, receiver);
while ((event = pn_connection_driver_next_event(&receiver)) != NULL) {
handle_receiver(&app, event);
}
shovel(receiver, sender);
}
pn_connection_driver_close(&receiver);
pn_connection_driver_close(&sender);
shovel(receiver, sender);
shovel(sender, receiver);
// this can take long time, up to 500 ms
pn_connection_driver_destroy(&receiver);
pn_connection_driver_destroy(&sender);
state.SetLabel("messages");
state.SetItemsProcessed(app.acknowledged);
if (VERBOSE)
printf("END BM_SendReceiveMessages\n");
}
BENCHMARK(BM_SendReceiveMessages)
->RangeMultiplier(3)
->Range(1, 200000)
->ArgName("creditWindow")
->Arg(1000)
->Unit(benchmark::kMillisecond);
///* Create a message with a map { "sequence" : number } encode it and return
/// the encoded buffer. */
static void send_message(app_data_t *app, pn_link_t *sender) {
/* Construct a message with the map { "sequence": 42 } */
pn_data_t *body;
pn_message_clear(app->message);
body = pn_message_body(app->message);
pn_data_put_int(pn_message_id(app->message),
app->sent); /* Set the message_id also */
pn_data_put_map(body);
pn_data_enter(body);
pn_data_put_string(body, pn_bytes(sizeof("sequence") - 1, "sequence"));
pn_data_put_int(body, 42); /* The sequence number */
pn_data_exit(body);
if (pn_message_send(app->message, sender, &app->message_buffer) < 0) {
fprintf(stderr, "error sending message: %s\n",
pn_error_text(pn_message_error(app->message)));
exit(1);
}
}
static void handle_sender(app_data_t *app, pn_event_t *event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
pn_connection_t *c = pn_event_connection(event);
pn_connection_set_container(c, "sendercid");
pn_connection_open(c);
pn_session_t *s = pn_session(c);
pn_session_open(s);
pn_link_t *l = pn_sender(s, "my_sender");
pn_terminus_set_address(pn_link_target(l), "example");
pn_link_set_snd_settle_mode(l, PN_SND_UNSETTLED);
pn_link_set_rcv_settle_mode(l, PN_RCV_FIRST);
pn_link_open(l);
} break;
case PN_LINK_FLOW: {
if (VERBOSE)
printf("BEGIN handle_sender: PN_LINK_FLOW\n");
/* The peer has given us some credit, now we can send messages */
pn_link_t *sender = pn_event_link(event);
while (pn_link_credit(sender) > 0 && app->sent != app->message_count) {
++app->sent;
/* Use sent counter as unique delivery tag. */
pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
send_message(app, sender);
}
break;
}
case PN_DELIVERY: {
/* We received acknowledgement from the peer that a message was delivered. */
pn_delivery_t *d = pn_event_delivery(event);
if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
if (VERBOSE)
printf("got PN_ACCEPTED\n");
if (++app->acknowledged == app->message_count) {
if (VERBOSE)
printf("%d messages sent and acknowledged\n", app->acknowledged);
pn_connection_close(pn_event_connection(event));
/* Continue handling events till we receive TRANSPORT_CLOSED */
}
} else {
fprintf(stderr, "unexpected delivery state %d\n", (int) pn_delivery_remote_state(d));
pn_connection_close(pn_event_connection(event));
exit(EXIT_FAILURE);
}
break;
}
case PN_CONNECTION_REMOTE_OPEN:
pn_connection_open(pn_event_connection(event)); /* Complete the open */
break;
case PN_SESSION_REMOTE_OPEN:
pn_session_open(pn_event_session(event));
break;
case PN_TRANSPORT_ERROR:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_CONNECTION_REMOTE_CLOSE:
check_condition(event,
pn_connection_remote_condition(pn_event_connection(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_SESSION_REMOTE_CLOSE:
check_condition(event,
pn_session_remote_condition(pn_event_session(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_LINK_REMOTE_CLOSE:
case PN_LINK_REMOTE_DETACH:
check_condition(event, pn_link_remote_condition(pn_event_link(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_TRANSPORT_CLOSED:
app->closed++;
break;
default:
break;
}
}
static void handle_receiver(app_data_t *app, pn_event_t *event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
pn_connection_t *c = pn_event_connection(event);
pn_connection_set_container(c, app->container_id);
// pn_connection_open(c);
pn_session_t *s = pn_session(c);
pn_session_open(s);
pn_link_t *l = pn_receiver(s, "my_receiver");
pn_terminus_set_address(pn_link_source(l), "example");
pn_link_open(l);
} break;
case PN_LINK_REMOTE_OPEN: {
pn_link_t *l = pn_event_link(event);
pn_terminus_t *t = pn_link_target(l);
pn_terminus_t *rt = pn_link_remote_target(l);
pn_terminus_set_address(t, pn_terminus_get_address(rt));
pn_link_open(l);
if (pn_link_is_receiver(l)) {
pn_link_flow(l, app->credit_window);
}
break;
}
case PN_DELIVERY: {
/* A message has been received */
pn_link_t *link = NULL;
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
link = pn_delivery_link(dlv);
decode_message(dlv);
/* Accept the delivery */
pn_delivery_update(dlv, PN_ACCEPTED);
/* done with the delivery, move to the next and free it */
pn_link_advance(link);
pn_delivery_settle(dlv); /* dlv is now freed */
}
pn_link_flow(link, app->credit_window - pn_link_credit(link));
} break;
case PN_CONNECTION_REMOTE_OPEN:
pn_connection_open(pn_event_connection(event)); /* Complete the open */
break;
case PN_SESSION_REMOTE_OPEN:
pn_session_open(pn_event_session(event));
break;
case PN_TRANSPORT_ERROR:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_CONNECTION_REMOTE_CLOSE:
check_condition(event,
pn_connection_remote_condition(pn_event_connection(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_SESSION_REMOTE_CLOSE:
check_condition(event,
pn_session_remote_condition(pn_event_session(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_LINK_REMOTE_CLOSE:
case PN_LINK_REMOTE_DETACH:
check_condition(event, pn_link_remote_condition(pn_event_link(event)));
pn_connection_close(pn_event_connection(event));
break;
case PN_TRANSPORT_CLOSED:
app->closed++;
break;
default:
break;
}
}
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (VERBOSE)
printf("beginning check_condition\n");
if (pn_condition_is_set(cond)) {
if (VERBOSE || ERRORS)
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
}
}
///* Copies output from first connection driver into input of the second. */
static void shovel(pn_connection_driver_t &sender, pn_connection_driver_t &receiver) {
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&receiver);
if (wbuf.size == 0) {
pn_connection_driver_write_done(&receiver, 0);
return;
}
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&sender);
if (rbuf.start == NULL) {
printf("shovel: rbuf.start is null\n");
fflush(stdout);
fflush(stderr);
exit(1);
}
size_t s = rbuf.size < wbuf.size ? rbuf.size : wbuf.size;
memcpy(rbuf.start, wbuf.start, s);
pn_connection_driver_read_done(&sender, s);
pn_connection_driver_write_done(&receiver, s);
}
static void decode_message(pn_delivery_t *dlv) {
static char buffer[MAX_SIZE];
ssize_t len;
// try to decode the message body
if (pn_delivery_pending(dlv) < MAX_SIZE) {
// read in the raw data
len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
if (len > 0) {
// decode it into a proton message
pn_message_t *m = pn_message();
if (PN_OK == pn_message_decode(m, buffer, len)) {
pn_string_t *s = pn_string(NULL);
pn_inspect(pn_message_body(m), s);
if (VERBOSE)
printf("%s\n", pn_string_get(s));
pn_free(s);
}
pn_message_free(m);
}
}
}