Fixes #482: replace reactor tests with proactor implementation (#488)

(cherry picked from commit ff073628c99e24eeb1904435666bb28eacfed20c)
diff --git a/tests/clogger.c b/tests/clogger.c
index dfa8e3a..a60a897 100644
--- a/tests/clogger.c
+++ b/tests/clogger.c
@@ -26,10 +26,8 @@
 
 #include "proton/connection.h"
 #include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
 #include "proton/link.h"
-#include "proton/reactor.h"
+#include "proton/proactor.h"
 #include "proton/session.h"
 #include "proton/transport.h"
 
@@ -63,14 +61,14 @@
 const char *target_address = "test-address";
 const char *host_address = "127.0.0.1:5672";
 const char *container_name = "Clogger";
+char proactor_address[1024];
 
-pn_reactor_t    *reactor;
+//
+pn_proactor_t   *proactor;
 pn_connection_t *pn_conn;
 pn_session_t    *pn_ssn;
 pn_link_t       *pn_link;
 pn_delivery_t   *pn_dlv;       // current in-flight delivery
-pn_handler_t    *_handler;
-
 uint32_t         bytes_sent;   // number of body data bytes written out link
 uint32_t         remote_max_frame = DEFAULT_MAX_FRAME;  // used to limit amount written
 
@@ -119,7 +117,7 @@
     case SIGINT:
     case SIGQUIT:
         stop = true;
-        if (reactor) pn_reactor_wakeup(reactor);
+        if (proactor) pn_proactor_interrupt(proactor);
         break;
     default:
         break;
@@ -193,9 +191,9 @@
             pn_delivery_settle(pn_dlv);
             if (limit && sent == limit) {
                 // no need to wait for acks
-                debug("stopping...\n");
+                debug("stopping (presettled)...\n");
                 stop = true;
-                pn_reactor_wakeup(reactor);
+                pn_proactor_interrupt(proactor);
             }
         }
         pn_dlv = 0;
@@ -208,10 +206,9 @@
 /* Process each event posted by the proactor.
    Return true if client has stopped.
  */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t etype)
+static bool event_handler(pn_event_t *event)
 {
+    const pn_event_type_t etype = pn_event_type(event);
     debug("new event=%s\n", pn_event_type_name(etype));
 
     switch (etype) {
@@ -243,13 +240,12 @@
             if (pn_link_credit(pn_link) > 0) {
                 if (!pn_dlv) {
                     start_message();
-                    pn_reactor_schedule(reactor, pause_msec, _handler);   // send body after pause
+                    pn_proactor_set_timeout(proactor, pause_msec);   // send body after pause
                 }
             }
         }
     } break;
 
-
     case PN_TRANSPORT: {
         ssize_t pending = pn_transport_pending(pn_event_transport(event));
         debug("PN_TRANSPORT pending=%ld\n", pending);
@@ -288,31 +284,37 @@
                 // initiate clean shutdown of the endpoints
                 debug("stopping...\n");
                 stop = true;
-                pn_reactor_wakeup(reactor);
+                pn_proactor_interrupt(proactor);
             }
         }
     } break;
 
-    case PN_TIMER_TASK: {
+    case PN_PROACTOR_TIMEOUT: {
+        if (pn_conn) pn_connection_wake(pn_conn);
+    } break;
+
+    case PN_CONNECTION_WAKE: {
         if (!send_message_data()) {   // not done sending
-            pn_reactor_schedule(reactor, pause_msec, _handler);
+            pn_proactor_set_timeout(proactor, pause_msec);
         } else if (limit == 0 || sent < limit) {
             if (pn_link_credit(pn_link) > 0) {
                 // send next message
                 start_message();
-                pn_reactor_schedule(reactor, pause_msec, _handler);
+                pn_proactor_set_timeout(proactor, pause_msec);
             }
         }
     } break;
 
+    case PN_PROACTOR_INACTIVE: {
+        debug("proactor inactive!\n");
+        return stop;
+    } break;
+
     default:
         break;
     }
-}
 
-
-static void delete_handler(pn_handler_t *handler)
-{
+    return false;
 }
 
 
@@ -374,51 +376,54 @@
         host_address += strlen("amqp://");
     }
 
-    // convert host_address to hostname and port
+    // trim port from hostname
     char *hostname = strdup(host_address);
     char *port = strchr(hostname, ':');
-    if (!port) {
-        port = "5672";
-    } else {
+    if (port) {
         *port++ = 0;
+    } else {
+        port = "5672";
     }
 
-    _handler = pn_handler_new(event_handler, 0, delete_handler);
-    pn_handler_add(_handler, pn_handshaker());
-
-    reactor = pn_reactor();
-    pn_conn = pn_reactor_connection_to_host(reactor,
-                                            hostname,
-                                            port,
-                                            _handler);
-
+    pn_conn = pn_connection();
     // the container name should be unique for each client
     pn_connection_set_container(pn_conn, container_name);
     pn_connection_set_hostname(pn_conn, hostname);
+    proactor = pn_proactor();
+    pn_proactor_addr(proactor_address, sizeof(proactor_address), hostname, port);
+    pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
+    free(hostname);
 
-    // break out of pn_reactor_process once a second to check if done
-    pn_reactor_set_timeout(reactor, 1000);
+    bool done = false;
+    while (!done) {
+        debug("Waiting for proactor event...\n");
+        pn_event_batch_t *events = pn_proactor_wait(proactor);
+        debug("Start new proactor batch\n");
+        pn_event_t *event = pn_event_batch_next(events);
+        while (event) {
+            done = event_handler(event);
+            if (done)
+                break;
 
-    pn_reactor_start(reactor);
+            event = pn_event_batch_next(events);
+        }
 
-    while (pn_reactor_process(reactor)) {
-        if (stop) {
-            // close the endpoints this will cause pn_reactor_process() to
-            // eventually break the loop
+        debug("Proactor batch processing done\n");
+        pn_proactor_done(proactor, events);
+
+        if (stop && pn_conn) {
+            debug("Stop detected - closing connection...\n");
             if (pn_link) pn_link_close(pn_link);
             if (pn_ssn) pn_session_close(pn_ssn);
-            if (pn_conn) pn_connection_close(pn_conn);
+            pn_connection_close(pn_conn);
             pn_link = 0;
             pn_ssn = 0;
             pn_conn = 0;
         }
     }
 
-    if (pn_link) pn_link_free(pn_link);
-    if (pn_ssn) pn_session_free(pn_ssn);
-    if (pn_conn) pn_connection_close(pn_conn);
-
-    pn_reactor_free(reactor);
+    debug("Send complete!\n");
+    pn_proactor_free(proactor);
 
     if (not_accepted) {
         printf("Sent: %" PRIu64 "  Accepted: %" PRIu64 " Not Accepted: %" PRIu64 "\n", sent, accepted, not_accepted);
diff --git a/tests/test-receiver.c b/tests/test-receiver.c
index 14f8c9b..68fb46f 100644
--- a/tests/test-receiver.c
+++ b/tests/test-receiver.c
@@ -20,12 +20,10 @@
 
 #include "proton/connection.h"
 #include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
 #include "proton/link.h"
 #include "proton/message.h"
-#include "proton/reactor.h"
 #include "proton/session.h"
+#include "proton/proactor.h"
 
 #include <inttypes.h>
 #include <signal.h>
@@ -34,11 +32,13 @@
 #include <string.h>
 #include <time.h>
 #include <unistd.h>
+#include <assert.h>
 
 #define BOOL2STR(b) ((b)?"true":"false")
 
 bool stop = false;
 bool verbose = false;
+bool debug_mode = false;
 
 int  credit_window = 1000;
 char *source_address = "test-address";  // name of the source node to receive from
@@ -46,17 +46,30 @@
 char *host_address = _addr;
 char *container_name = "TestReceiver";
 bool drop_connection = false;
+char proactor_address[1024];
 
 pn_connection_t *pn_conn;
 pn_session_t *pn_ssn;
 pn_link_t *pn_link;
-pn_reactor_t *reactor;
+pn_proactor_t *proactor;
 pn_message_t *in_message;       // holds the current received message
 
 uint64_t count = 0;
 uint64_t limit = 0;   // if > 0 stop after limit messages arrive
 
 
+void debug(const char *format, ...)
+{
+    va_list args;
+
+    if (!debug_mode) return;
+
+    va_start(args, format);
+    vprintf(format, args);
+    va_end(args);
+}
+
+
 static void signal_handler(int signum)
 {
     signal(SIGINT,  SIG_IGN);
@@ -66,6 +79,7 @@
     case SIGINT:
     case SIGQUIT:
         stop = true;
+        if (proactor) pn_proactor_interrupt(proactor);
         break;
     default:
         break;
@@ -73,23 +87,12 @@
 }
 
 
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
-    if (in_message) {
-        pn_message_free(in_message);
-        in_message = NULL;
-    }
-}
-
-
-/* Process each event posted by the reactor.
+/* Process each event posted by the proactor
  */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
+static bool event_handler(pn_event_t *event)
 {
+    const pn_event_type_t type = pn_event_type(event);
+    debug("new event=%s\n", pn_event_type_name(type));
     switch (type) {
 
     case PN_CONNECTION_INIT: {
@@ -140,14 +143,31 @@
 
             if (limit && count == limit) {
                 stop = true;
-                pn_reactor_wakeup(reactor);
             }
         }
     } break;
 
+    case PN_PROACTOR_TIMEOUT: {
+        if (verbose) {
+            fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
+            fflush(stdout);
+            if (!stop) {
+                pn_proactor_set_timeout(proactor, 10 * 1000);
+            }
+        }
+    } break;
+
+    case PN_PROACTOR_INACTIVE: {
+        assert(stop);  // expect: inactive due to stopping
+        debug("proactor inactive!\n");
+        return true;
+    } break;
+
     default:
         break;
     }
+
+    return false;
 }
 
 static void usage(void)
@@ -160,21 +180,17 @@
     printf("-w      \tCredit window [%d]\n", credit_window);
     printf("-E      \tExit without cleanly closing the connection [off]\n");
     printf("-d      \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
+    printf("-D      \tPrint debug info [off]\n");
     exit(1);
 }
 
 
 int main(int argc, char** argv)
 {
-    /* create a handler for the connection's events.
-     */
-    pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
-    pn_handler_add(handler, pn_handshaker());
-
     /* command line options */
     opterr = 0;
     int c;
-    while((c = getopt(argc, argv, "i:a:s:hdw:c:E")) != -1) {
+    while((c = getopt(argc, argv, "i:a:s:hdDw:c:E")) != -1) {
         switch(c) {
         case 'h': usage(); break;
         case 'a': host_address = optarg; break;
@@ -190,6 +206,7 @@
             break;
         case 'E': drop_connection = true;  break;
         case 'd': verbose = true;          break;
+        case 'D': debug_mode = true;       break;
 
         default:
             usage();
@@ -210,50 +227,52 @@
         port = "5672";
     }
 
-    reactor = pn_reactor();
-    pn_conn = pn_reactor_connection_to_host(reactor,
-                                            host,
-                                            port,
-                                            handler);
-
+    pn_conn = pn_connection();
     // the container name should be unique for each client
     pn_connection_set_container(pn_conn, container_name);
     pn_connection_set_hostname(pn_conn, host);
+    proactor = pn_proactor();
+    pn_proactor_addr(proactor_address, sizeof(proactor_address), host, port);
+    pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
 
-    // periodic wakeup
-    pn_reactor_set_timeout(reactor, 1000);
+    if (verbose) {
+        // print status every 10 seconds..
+        pn_proactor_set_timeout(proactor, 10 * 1000);
+    }
 
-    pn_reactor_start(reactor);
+    bool done = false;
+    while (!done) {
+        debug("Waiting for proactor event...\n");
+        pn_event_batch_t *events = pn_proactor_wait(proactor);
+        debug("Start new proactor batch\n");
 
-    time_t last_log = time(NULL);
-    while (pn_reactor_process(reactor)) {
+        pn_event_t *event = pn_event_batch_next(events);
+        while (!done && event) {
+            done = event_handler(event);
+            event = pn_event_batch_next(events);
+        }
+
+        debug("Proactor batch processing done\n");
+        pn_proactor_done(proactor, events);
+
         if (stop) {
-            if (drop_connection)  // hard exit
+            pn_proactor_cancel_timeout(proactor);
+            if (drop_connection) {  // hard stop
                 exit(0);
-            // close the endpoints this will cause pn_reactor_process() to
-            // eventually break the loop
-            if (pn_link) pn_link_close(pn_link);
-            if (pn_ssn) pn_session_close(pn_ssn);
-            if (pn_conn) pn_connection_close(pn_conn);
-
-        } else if (verbose) {
-
-            // periodically give status for test output logs
-
-            time_t now = time(NULL);
-            if ((now - last_log) >= 10) {
-                fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
-                fflush(stdout);
-                last_log = now;
+            }
+            if (pn_conn) {
+                debug("Stop detected - closing connection...\n");
+                if (pn_link) pn_link_close(pn_link);
+                if (pn_ssn) pn_session_close(pn_ssn);
+                pn_connection_close(pn_conn);
+                pn_link = 0;
+                pn_ssn = 0;
+                pn_conn = 0;
             }
         }
     }
 
-    if (pn_link) pn_link_free(pn_link);
-    if (pn_ssn) pn_session_free(pn_ssn);
-    if (pn_conn) pn_connection_close(pn_conn);
-
-    pn_reactor_free(reactor);
+    pn_proactor_free(proactor);
 
     if (verbose) {
         fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
diff --git a/tests/test-sender.c b/tests/test-sender.c
index 57867b4..d0bd448 100644
--- a/tests/test-sender.c
+++ b/tests/test-sender.c
@@ -25,12 +25,10 @@
 
 #include "proton/connection.h"
 #include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
 #include "proton/link.h"
 #include "proton/message.h"
-#include "proton/reactor.h"
 #include "proton/session.h"
+#include "proton/proactor.h"
 
 #include <errno.h>
 #include <inttypes.h>
@@ -41,6 +39,7 @@
 #include <string.h>
 #include <time.h>
 #include <unistd.h>
+#include <assert.h>
 
 #define BOOL2STR(b) ((b)?"true":"false")
 
@@ -56,6 +55,7 @@
 
 bool stop = false;
 bool verbose = false;
+bool debug_mode = false;
 
 uint64_t limit = 1;               // # messages to send
 uint64_t count = 0;               // # sent
@@ -86,11 +86,12 @@
 char _addr[] = "127.0.0.1:5672";
 char *host_address = _addr;
 char *container_name = "TestSender";
+char proactor_address[1024];
 
 pn_connection_t *pn_conn;
 pn_session_t *pn_ssn;
 pn_link_t *pn_link;
-pn_reactor_t *reactor;
+pn_proactor_t *proactor;
 pn_message_t *out_message;
 
 
@@ -109,6 +110,18 @@
     "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
 
 
+void debug(const char *format, ...)
+{
+    va_list args;
+
+    if (!debug_mode) return;
+
+    va_start(args, format);
+    vprintf(format, args);
+    va_end(args);
+}
+
+
 static void add_message_annotations(pn_message_t *out_message)
 {
     // just a bunch of dummy MA
@@ -228,6 +241,7 @@
     case SIGINT:
     case SIGQUIT:
         stop = true;
+        if (proactor) pn_proactor_interrupt(proactor);
         break;
     default:
         break;
@@ -235,20 +249,12 @@
 }
 
 
-static void delete_handler(pn_handler_t *handler)
-{
-    free(encode_buffer);
-    pn_message_free(out_message);
-    free((void *)body_data.start);
-}
-
-
-/* Process each event posted by the reactor.
+/* Process each event posted by the proactor.
  */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
+static bool event_handler(pn_event_t *event)
 {
+    const pn_event_type_t type = pn_event_type(event);
+    debug("new event=%s\n", pn_event_type_name(type));
     switch (type) {
 
     case PN_CONNECTION_INIT: {
@@ -289,8 +295,8 @@
                 ++accepted;
                 if (limit && count == limit) {
                     // no need to wait for acks
+                    debug("stopping (presettled)...\n");
                     stop = true;
-                    pn_reactor_wakeup(reactor);
                 }
             }
         }
@@ -332,17 +338,39 @@
 
             if (limit && acked == limit) {
                 // initiate clean shutdown of the endpoints
+                debug("stopping...\n");
                 stop = true;
-                pn_reactor_wakeup(reactor);
             }
         }
     } break;
 
+    case PN_PROACTOR_TIMEOUT: {
+        if (verbose) {
+            fprintf(stdout,
+                    "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+                    " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
+                    count, accepted, rejected, released, modified, limit);
+            fflush(stdout);
+            if (!stop) {
+                pn_proactor_set_timeout(proactor, 10 * 1000);
+            }
+        }
+    } break;
+
+    case PN_PROACTOR_INACTIVE: {
+        assert(stop);  // expect: inactive due to stopping
+        debug("proactor inactive!\n");
+        return true;
+    } break;
+
     default:
         break;
     }
+
+    return false;
 }
 
+
 static void usage(void)
 {
   printf("Usage: sender <options>\n");
@@ -359,6 +387,7 @@
   printf("-p      \tMessage priority [%d]\n", priority);
   printf("-X      \tMessage body data pattern [%c]\n", (char)body_data_pattern);
   printf("-d      \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
+  printf("-D      \tPrint debug info [off]\n");
   exit(1);
 }
 
@@ -367,7 +396,7 @@
     /* command line options */
     opterr = 0;
     int c;
-    while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEp:X:")) != -1) {
+    while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEDp:X:")) != -1) {
         switch(c) {
         case 'h': usage(); break;
         case 'a': host_address = optarg; break;
@@ -393,6 +422,7 @@
         case 'M': add_annotations = true;  break;
         case 'E': drop_connection = true;  break;
         case 'X': body_data_pattern = optarg[0];  break;
+        case 'D': debug_mode = true; break;
         case 'p':
             if (sscanf(optarg, "%u", &priority) != 1)
                 usage();
@@ -417,59 +447,57 @@
         port = "5672";
     }
 
-    pn_handler_t *handler = pn_handler_new(event_handler, 0, delete_handler);
-    pn_handler_add(handler, pn_handshaker());
-
-    reactor = pn_reactor();
-    pn_conn = pn_reactor_connection_to_host(reactor,
-                                            host,
-                                            port,
-                                            handler);
-
+    pn_conn = pn_connection();
     // the container name should be unique for each client
     pn_connection_set_container(pn_conn, container_name);
     pn_connection_set_hostname(pn_conn, host);
+    proactor = pn_proactor();
+    pn_proactor_addr(proactor_address, sizeof(proactor_address), host, port);
+    pn_proactor_connect2(proactor, pn_conn, 0, proactor_address);
 
-    // break out of pn_reactor_process once a second to check if done
-    pn_reactor_set_timeout(reactor, 1000);
+    if (verbose) {
+        // print status every 10 seconds..
+        pn_proactor_set_timeout(proactor, 10 * 1000);
+    }
 
-    pn_reactor_start(reactor);
+    bool done = false;
+    while (!done) {
+        debug("Waiting for proactor event...\n");
+        pn_event_batch_t *events = pn_proactor_wait(proactor);
+        debug("Start new proactor batch\n");
 
-    time_t last_log = time(NULL);
-    while (pn_reactor_process(reactor)) {
+        pn_event_t *event = pn_event_batch_next(events);
+        while (!done && event) {
+            done = event_handler(event);
+            event = pn_event_batch_next(events);
+        }
+
+        debug("Proactor batch processing done\n");
+        pn_proactor_done(proactor, events);
+
         if (stop) {
+            pn_proactor_cancel_timeout(proactor);
             if (drop_connection) {  // hard stop
                 fprintf(stdout,
                         "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
                         " Released:%"PRIu64" Modified:%"PRIu64"\n",
                         count, accepted, rejected, released, modified);
+                fflush(stdout);
                 exit(0);
             }
-            if (pn_link) pn_link_close(pn_link);
-            if (pn_ssn) pn_session_close(pn_ssn);
-            if (pn_conn) pn_connection_close(pn_conn);
-
-        } else if (verbose) {
-
-            // periodically give status for test output logs
-
-            time_t now = time(NULL);
-            if ((now - last_log) >= 10) {
-                fprintf(stdout,
-                        "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
-                        " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
-                        count, accepted, rejected, released, modified, limit);
-                fflush(stdout);
-                last_log = now;
+            if (pn_conn) {
+                debug("Stop detected - closing connection...\n");
+                if (pn_link) pn_link_close(pn_link);
+                if (pn_ssn) pn_session_close(pn_ssn);
+                pn_connection_close(pn_conn);
+                pn_link = 0;
+                pn_ssn = 0;
+                pn_conn = 0;
             }
         }
     }
 
-    if (pn_link) pn_link_free(pn_link);
-    if (pn_ssn) pn_session_free(pn_ssn);
-    if (pn_conn) pn_connection_close(pn_conn);
-
-    pn_reactor_free(reactor);
+    pn_proactor_free(proactor);
 
     if (verbose) {
         fprintf(stdout,