Fixes #482: replace reactor tests with proactor implementation (#488)
(cherry picked from commit ff073628c99e24eeb1904435666bb28eacfed20c)
diff --git a/tests/clogger.c b/tests/clogger.c
index dfa8e3a..a60a897 100644
--- a/tests/clogger.c
+++ b/tests/clogger.c
@@ -26,10 +26,8 @@
#include "proton/connection.h"
#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
#include "proton/link.h"
-#include "proton/reactor.h"
+#include "proton/proactor.h"
#include "proton/session.h"
#include "proton/transport.h"
@@ -63,14 +61,14 @@
const char *target_address = "test-address";
const char *host_address = "127.0.0.1:5672";
const char *container_name = "Clogger";
+char proactor_address[1024];
-pn_reactor_t *reactor;
+//
+pn_proactor_t *proactor;
pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
pn_delivery_t *pn_dlv; // current in-flight delivery
-pn_handler_t *_handler;
-
uint32_t bytes_sent; // number of body data bytes written out link
uint32_t remote_max_frame = DEFAULT_MAX_FRAME; // used to limit amount written
@@ -119,7 +117,7 @@
case SIGINT:
case SIGQUIT:
stop = true;
- if (reactor) pn_reactor_wakeup(reactor);
+ if (proactor) pn_proactor_interrupt(proactor);
break;
default:
break;
@@ -193,9 +191,9 @@
pn_delivery_settle(pn_dlv);
if (limit && sent == limit) {
// no need to wait for acks
- debug("stopping...\n");
+ debug("stopping (presettled)...\n");
stop = true;
- pn_reactor_wakeup(reactor);
+ pn_proactor_interrupt(proactor);
}
}
pn_dlv = 0;
@@ -208,10 +206,9 @@
/* Process each event posted by the proactor.
Return true if client has stopped.
*/
-static void event_handler(pn_handler_t *handler,
- pn_event_t *event,
- pn_event_type_t etype)
+static bool event_handler(pn_event_t *event)
{
+ const pn_event_type_t etype = pn_event_type(event);
debug("new event=%s\n", pn_event_type_name(etype));
switch (etype) {
@@ -243,13 +240,12 @@
if (pn_link_credit(pn_link) > 0) {
if (!pn_dlv) {
start_message();
- pn_reactor_schedule(reactor, pause_msec, _handler); // send body after pause
+ pn_proactor_set_timeout(proactor, pause_msec); // send body after pause
}
}
}
} break;
-
case PN_TRANSPORT: {
ssize_t pending = pn_transport_pending(pn_event_transport(event));
debug("PN_TRANSPORT pending=%ld\n", pending);
@@ -288,31 +284,37 @@
// initiate clean shutdown of the endpoints
debug("stopping...\n");
stop = true;
- pn_reactor_wakeup(reactor);
+ pn_proactor_interrupt(proactor);
}
}
} break;
- case PN_TIMER_TASK: {
+ case PN_PROACTOR_TIMEOUT: {
+ if (pn_conn) pn_connection_wake(pn_conn);
+ } break;
+
+ case PN_CONNECTION_WAKE: {
if (!send_message_data()) { // not done sending
- pn_reactor_schedule(reactor, pause_msec, _handler);
+ pn_proactor_set_timeout(proactor, pause_msec);
} else if (limit == 0 || sent < limit) {
if (pn_link_credit(pn_link) > 0) {
// send next message
start_message();
- pn_reactor_schedule(reactor, pause_msec, _handler);
+ pn_proactor_set_timeout(proactor, pause_msec);
}
}
} break;
+ case PN_PROACTOR_INACTIVE: {
+ debug("proactor inactive!\n");
+ return stop;
+ } break;
+
default:
break;
}
-}
-
-static void delete_handler(pn_handler_t *handler)
-{
+ return false;
}
@@ -374,51 +376,54 @@
host_address += strlen("amqp://");
}
- // convert host_address to hostname and port
+ // trim port from hostname
char *hostname = strdup(host_address);
char *port = strchr(hostname, ':');
- if (!port) {
- port = "5672";
- } else {
+ if (port) {
*port++ = 0;
+ } else {
+ port = "5672";
}
- _handler = pn_handler_new(event_handler, 0, delete_handler);
- pn_handler_add(_handler, pn_handshaker());
-
- reactor = pn_reactor();
- pn_conn = pn_reactor_connection_to_host(reactor,
- hostname,
- port,
- _handler);
-
+ pn_conn = pn_connection();
// the container name should be unique for each client
pn_connection_set_container(pn_conn, container_name);
pn_connection_set_hostname(pn_conn, hostname);
+ proactor = pn_proactor();
+ pn_proactor_addr(proactor_address, sizeof(proactor_address), hostname, port);
+ pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
+ free(hostname);
- // break out of pn_reactor_process once a second to check if done
- pn_reactor_set_timeout(reactor, 1000);
+ bool done = false;
+ while (!done) {
+ debug("Waiting for proactor event...\n");
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ debug("Start new proactor batch\n");
+ pn_event_t *event = pn_event_batch_next(events);
+ while (event) {
+ done = event_handler(event);
+ if (done)
+ break;
- pn_reactor_start(reactor);
+ event = pn_event_batch_next(events);
+ }
- while (pn_reactor_process(reactor)) {
- if (stop) {
- // close the endpoints this will cause pn_reactor_process() to
- // eventually break the loop
+ debug("Proactor batch processing done\n");
+ pn_proactor_done(proactor, events);
+
+ if (stop && pn_conn) {
+ debug("Stop detected - closing connection...\n");
if (pn_link) pn_link_close(pn_link);
if (pn_ssn) pn_session_close(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
+ pn_connection_close(pn_conn);
pn_link = 0;
pn_ssn = 0;
pn_conn = 0;
}
}
- if (pn_link) pn_link_free(pn_link);
- if (pn_ssn) pn_session_free(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
-
- pn_reactor_free(reactor);
+ debug("Send complete!\n");
+ pn_proactor_free(proactor);
if (not_accepted) {
printf("Sent: %" PRIu64 " Accepted: %" PRIu64 " Not Accepted: %" PRIu64 "\n", sent, accepted, not_accepted);
diff --git a/tests/test-receiver.c b/tests/test-receiver.c
index 14f8c9b..68fb46f 100644
--- a/tests/test-receiver.c
+++ b/tests/test-receiver.c
@@ -20,12 +20,10 @@
#include "proton/connection.h"
#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
#include "proton/link.h"
#include "proton/message.h"
-#include "proton/reactor.h"
#include "proton/session.h"
+#include "proton/proactor.h"
#include <inttypes.h>
#include <signal.h>
@@ -34,11 +32,13 @@
#include <string.h>
#include <time.h>
#include <unistd.h>
+#include <assert.h>
#define BOOL2STR(b) ((b)?"true":"false")
bool stop = false;
bool verbose = false;
+bool debug_mode = false;
int credit_window = 1000;
char *source_address = "test-address"; // name of the source node to receive from
@@ -46,17 +46,30 @@
char *host_address = _addr;
char *container_name = "TestReceiver";
bool drop_connection = false;
+char proactor_address[1024];
pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
-pn_reactor_t *reactor;
+pn_proactor_t *proactor;
pn_message_t *in_message; // holds the current received message
uint64_t count = 0;
uint64_t limit = 0; // if > 0 stop after limit messages arrive
+void debug(const char *format, ...)
+{
+ va_list args;
+
+ if (!debug_mode) return;
+
+ va_start(args, format);
+ vprintf(format, args);
+ va_end(args);
+}
+
+
static void signal_handler(int signum)
{
signal(SIGINT, SIG_IGN);
@@ -66,6 +79,7 @@
case SIGINT:
case SIGQUIT:
stop = true;
+ if (proactor) pn_proactor_interrupt(proactor);
break;
default:
break;
@@ -73,23 +87,12 @@
}
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
- if (in_message) {
- pn_message_free(in_message);
- in_message = NULL;
- }
-}
-
-
-/* Process each event posted by the reactor.
+/* Process each event posted by the proactor
*/
-static void event_handler(pn_handler_t *handler,
- pn_event_t *event,
- pn_event_type_t type)
+static bool event_handler(pn_event_t *event)
{
+ const pn_event_type_t type = pn_event_type(event);
+ debug("new event=%s\n", pn_event_type_name(type));
switch (type) {
case PN_CONNECTION_INIT: {
@@ -140,14 +143,31 @@
if (limit && count == limit) {
stop = true;
- pn_reactor_wakeup(reactor);
}
}
} break;
+ case PN_PROACTOR_TIMEOUT: {
+ if (verbose) {
+ fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
+ fflush(stdout);
+ if (!stop) {
+ pn_proactor_set_timeout(proactor, 10 * 1000);
+ }
+ }
+ } break;
+
+ case PN_PROACTOR_INACTIVE: {
+ assert(stop); // expect: inactive due to stopping
+ debug("proactor inactive!\n");
+ return true;
+ } break;
+
default:
break;
}
+
+ return false;
}
static void usage(void)
@@ -160,21 +180,17 @@
printf("-w \tCredit window [%d]\n", credit_window);
printf("-E \tExit without cleanly closing the connection [off]\n");
printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
+ printf("-D \tPrint debug info [off]\n");
exit(1);
}
int main(int argc, char** argv)
{
- /* create a handler for the connection's events.
- */
- pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
- pn_handler_add(handler, pn_handshaker());
-
/* command line options */
opterr = 0;
int c;
- while((c = getopt(argc, argv, "i:a:s:hdw:c:E")) != -1) {
+ while((c = getopt(argc, argv, "i:a:s:hdDw:c:E")) != -1) {
switch(c) {
case 'h': usage(); break;
case 'a': host_address = optarg; break;
@@ -190,6 +206,7 @@
break;
case 'E': drop_connection = true; break;
case 'd': verbose = true; break;
+ case 'D': debug_mode = true; break;
default:
usage();
@@ -210,50 +227,52 @@
port = "5672";
}
- reactor = pn_reactor();
- pn_conn = pn_reactor_connection_to_host(reactor,
- host,
- port,
- handler);
-
+ pn_conn = pn_connection();
// the container name should be unique for each client
pn_connection_set_container(pn_conn, container_name);
pn_connection_set_hostname(pn_conn, host);
+ proactor = pn_proactor();
+ pn_proactor_addr(proactor_address, sizeof(proactor_address), host, port);
+ pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
- // periodic wakeup
- pn_reactor_set_timeout(reactor, 1000);
+ if (verbose) {
+ // print status every 10 seconds..
+ pn_proactor_set_timeout(proactor, 10 * 1000);
+ }
- pn_reactor_start(reactor);
+ bool done = false;
+ while (!done) {
+ debug("Waiting for proactor event...\n");
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ debug("Start new proactor batch\n");
- time_t last_log = time(NULL);
- while (pn_reactor_process(reactor)) {
+ pn_event_t *event = pn_event_batch_next(events);
+ while (!done && event) {
+ done = event_handler(event);
+ event = pn_event_batch_next(events);
+ }
+
+ debug("Proactor batch processing done\n");
+ pn_proactor_done(proactor, events);
+
if (stop) {
- if (drop_connection) // hard exit
+ pn_proactor_cancel_timeout(proactor);
+ if (drop_connection) { // hard stop
exit(0);
- // close the endpoints this will cause pn_reactor_process() to
- // eventually break the loop
- if (pn_link) pn_link_close(pn_link);
- if (pn_ssn) pn_session_close(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
-
- } else if (verbose) {
-
- // periodically give status for test output logs
-
- time_t now = time(NULL);
- if ((now - last_log) >= 10) {
- fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
- fflush(stdout);
- last_log = now;
+ }
+ if (pn_conn) {
+ debug("Stop detected - closing connection...\n");
+ if (pn_link) pn_link_close(pn_link);
+ if (pn_ssn) pn_session_close(pn_ssn);
+ pn_connection_close(pn_conn);
+ pn_link = 0;
+ pn_ssn = 0;
+ pn_conn = 0;
}
}
}
- if (pn_link) pn_link_free(pn_link);
- if (pn_ssn) pn_session_free(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
-
- pn_reactor_free(reactor);
+ pn_proactor_free(proactor);
if (verbose) {
fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
diff --git a/tests/test-sender.c b/tests/test-sender.c
index 57867b4..d0bd448 100644
--- a/tests/test-sender.c
+++ b/tests/test-sender.c
@@ -25,12 +25,10 @@
#include "proton/connection.h"
#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
#include "proton/link.h"
#include "proton/message.h"
-#include "proton/reactor.h"
#include "proton/session.h"
+#include "proton/proactor.h"
#include <errno.h>
#include <inttypes.h>
@@ -41,6 +39,7 @@
#include <string.h>
#include <time.h>
#include <unistd.h>
+#include <assert.h>
#define BOOL2STR(b) ((b)?"true":"false")
@@ -56,6 +55,7 @@
bool stop = false;
bool verbose = false;
+bool debug_mode = false;
uint64_t limit = 1; // # messages to send
uint64_t count = 0; // # sent
@@ -86,11 +86,12 @@
char _addr[] = "127.0.0.1:5672";
char *host_address = _addr;
char *container_name = "TestSender";
+char proactor_address[1024];
pn_connection_t *pn_conn;
pn_session_t *pn_ssn;
pn_link_t *pn_link;
-pn_reactor_t *reactor;
+pn_proactor_t *proactor;
pn_message_t *out_message;
@@ -109,6 +110,18 @@
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
+void debug(const char *format, ...)
+{
+ va_list args;
+
+ if (!debug_mode) return;
+
+ va_start(args, format);
+ vprintf(format, args);
+ va_end(args);
+}
+
+
static void add_message_annotations(pn_message_t *out_message)
{
// just a bunch of dummy MA
@@ -228,6 +241,7 @@
case SIGINT:
case SIGQUIT:
stop = true;
+ if (proactor) pn_proactor_interrupt(proactor);
break;
default:
break;
@@ -235,20 +249,12 @@
}
-static void delete_handler(pn_handler_t *handler)
-{
- free(encode_buffer);
- pn_message_free(out_message);
- free((void *)body_data.start);
-}
-
-
-/* Process each event posted by the reactor.
+/* Process each event posted by the proactor.
*/
-static void event_handler(pn_handler_t *handler,
- pn_event_t *event,
- pn_event_type_t type)
+static bool event_handler(pn_event_t *event)
{
+ const pn_event_type_t type = pn_event_type(event);
+ debug("new event=%s\n", pn_event_type_name(type));
switch (type) {
case PN_CONNECTION_INIT: {
@@ -289,8 +295,8 @@
++accepted;
if (limit && count == limit) {
// no need to wait for acks
+ debug("stopping (presettled)...\n");
stop = true;
- pn_reactor_wakeup(reactor);
}
}
}
@@ -332,17 +338,39 @@
if (limit && acked == limit) {
// initiate clean shutdown of the endpoints
+ debug("stopping...\n");
stop = true;
- pn_reactor_wakeup(reactor);
}
}
} break;
+ case PN_PROACTOR_TIMEOUT: {
+ if (verbose) {
+ fprintf(stdout,
+ "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+ " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
+ count, accepted, rejected, released, modified, limit);
+ fflush(stdout);
+ if (!stop) {
+ pn_proactor_set_timeout(proactor, 10 * 1000);
+ }
+ }
+ } break;
+
+ case PN_PROACTOR_INACTIVE: {
+ assert(stop); // expect: inactive due to stopping
+ debug("proactor inactive!\n");
+ return true;
+ } break;
+
default:
break;
}
+
+ return false;
}
+
static void usage(void)
{
printf("Usage: sender <options>\n");
@@ -359,6 +387,7 @@
printf("-p \tMessage priority [%d]\n", priority);
printf("-X \tMessage body data pattern [%c]\n", (char)body_data_pattern);
printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
+ printf("-D \tPrint debug info [off]\n");
exit(1);
}
@@ -367,7 +396,7 @@
/* command line options */
opterr = 0;
int c;
- while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEp:X:")) != -1) {
+ while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEDp:X:")) != -1) {
switch(c) {
case 'h': usage(); break;
case 'a': host_address = optarg; break;
@@ -393,6 +422,7 @@
case 'M': add_annotations = true; break;
case 'E': drop_connection = true; break;
case 'X': body_data_pattern = optarg[0]; break;
+ case 'D': debug_mode = true; break;
case 'p':
if (sscanf(optarg, "%u", &priority) != 1)
usage();
@@ -417,59 +447,57 @@
port = "5672";
}
- pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
- pn_handler_add(handler, pn_handshaker());
-
- reactor = pn_reactor();
- pn_conn = pn_reactor_connection_to_host(reactor,
- host,
- port,
- handler);
-
+ pn_conn = pn_connection();
// the container name should be unique for each client
pn_connection_set_container(pn_conn, container_name);
pn_connection_set_hostname(pn_conn, host);
+ proactor = pn_proactor();
+ pn_proactor_addr(proactor_address, sizeof(proactor_address), host, port);
+ pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
- // break out of pn_reactor_process once a second to check if done
- pn_reactor_set_timeout(reactor, 1000);
+ if (verbose) {
+ // print status every 10 seconds..
+ pn_proactor_set_timeout(proactor, 10 * 1000);
+ }
- pn_reactor_start(reactor);
+ bool done = false;
+ while (!done) {
+ debug("Waiting for proactor event...\n");
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ debug("Start new proactor batch\n");
- time_t last_log = time(NULL);
- while (pn_reactor_process(reactor)) {
+ pn_event_t *event = pn_event_batch_next(events);
+ while (!done && event) {
+ done = event_handler(event);
+ event = pn_event_batch_next(events);
+ }
+
+ debug("Proactor batch processing done\n");
+ pn_proactor_done(proactor, events);
+
if (stop) {
+ pn_proactor_cancel_timeout(proactor);
if (drop_connection) { // hard stop
fprintf(stdout,
"Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
" Released:%"PRIu64" Modified:%"PRIu64"\n",
count, accepted, rejected, released, modified);
+ fflush(stdout);
exit(0);
}
- if (pn_link) pn_link_close(pn_link);
- if (pn_ssn) pn_session_close(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
-
- } else if (verbose) {
-
- // periodically give status for test output logs
-
- time_t now = time(NULL);
- if ((now - last_log) >= 10) {
- fprintf(stdout,
- "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
- " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
- count, accepted, rejected, released, modified, limit);
- fflush(stdout);
- last_log = now;
+ if (pn_conn) {
+ debug("Stop detected - closing connection...\n");
+ if (pn_link) pn_link_close(pn_link);
+ if (pn_ssn) pn_session_close(pn_ssn);
+ pn_connection_close(pn_conn);
+ pn_link = 0;
+ pn_ssn = 0;
+ pn_conn = 0;
}
}
}
- if (pn_link) pn_link_free(pn_link);
- if (pn_ssn) pn_session_free(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
-
- pn_reactor_free(reactor);
+ pn_proactor_free(proactor);
if (verbose) {
fprintf(stdout,