PROTON-2931: epoll proactor thread races using async c-ares name resolver library
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 7ae86f1..967464e 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -271,6 +271,7 @@
   pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
   bool io_doublecheck;               /* callbacks made and new IO may have arrived */
   uint64_t expected_timeout;
+  bool name_lookup_pending;
   char addr_buf[1];
 } pconnection_t;
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index df1bde8..0479c57 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -836,7 +836,7 @@
 // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer.
 // Return true when all possible outstanding epoll events associated with this pconnection have been processed.
 static inline bool pconnection_is_final(pconnection_t *pc) {
-  return !pc->current_arm && !pc->task.ready && !pc->tick_pending;
+  return !pc->current_arm && !pc->task.ready && !pc->tick_pending && !pc->name_lookup_pending;
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
@@ -1400,6 +1400,7 @@
 }
 
 /* Called on initial connect, and if connection fails to try another address */
+/* May be called within the pconnection task or from an external name_lookup task */
 static void pconnection_maybe_connect_lh(pconnection_t *pc) {
   errno = 0;
   if (!pc->connected) {         /* Not yet connected */
@@ -1445,8 +1446,7 @@
 
 /* Called when connection name lookup completes (from name_lookup done_cb). Call with task lock held. */
 static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, int gai_error) {
-  pn_proactor_t *p = pc->task.proactor;
-  bool notify = false;
+  pc->name_lookup_pending = false;
   if (gai_error) {
     psocket_gai_error(&pc->psocket, gai_error, "connect to ");
   } else if (ai) {
@@ -1457,8 +1457,8 @@
       return;
     }
   }
-  notify = schedule(&pc->task);
-  if (notify) notify_poller(p);
+  bool notify = schedule(&pc->task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) {
@@ -1472,10 +1472,28 @@
 // Return true if the socket is connecting and there are no Proton events to deliver.
 static bool pconnection_first_connect_lh(pconnection_t *pc) {
   pn_proactor_t *p = pc->task.proactor;
+  pn_transport_t *tp = pc->driver.transport;
+  pc->name_lookup_pending = true;
+
   unlock(&pc->task.mutex);
   bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, connection_done_cb);
   lock(&pc->task.mutex);
-  return rc;
+
+  if (!rc) {
+    // Either the callback was synchronous or no callback was possible
+    if (pc->name_lookup_pending) {
+      // Clean up since there will be no callback.
+      pc->name_lookup_pending = false;
+      psocket_error(&pc->psocket, EAI_FAIL, "internal error on connect");
+    }
+    return false;
+  }
+  // Name lookup started.  Callback may have already completed and failed.
+  if (!pc->name_lookup_pending) {
+    if (pn_condition_is_set(pn_transport_condition(tp)))
+      return false;
+  }
+  return true;
 }
 
 void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 1f056c8..b073bee 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -56,6 +56,7 @@
   bool hup_detected;
   bool read_check;
   bool first_schedule;
+  bool name_lookup_pending;
   char *taddr;
 };
 
@@ -110,8 +111,14 @@
 }
 
 /* Called on initial connect, and if connection fails to try another address */
+/* May be called within the praw_connection task or from an external name_lookup task */
 static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
+  if (prc->task.closing) {
+    return;
+  }
+  int err = 0;                 /* Initialized in case while loop has zero iterations */
   while (prc->ai) {            /* Have an address */
+    err = 0;
     struct addrinfo *ai = prc->ai;
     prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
     int fd = socket(ai->ai_family, SOCK_STREAM, 0);
@@ -125,14 +132,19 @@
         praw_connection_start(prc, fd);
         return;               /* Async connection started */
       } else {
+        err = errno;
         close(fd);
       }
+    } else {
+      err = errno;
     }
     /* connect failed immediately, go round the loop to try the next addr */
   }
-  int err;
-  socklen_t errlen = sizeof(err);
-  getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
+
+  if (err == 0 && prc->psocket.epoll_io.fd >= 0) {
+    socklen_t errlen = sizeof(err);
+    getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
+  }
   psocket_error(prc, err, "on connect");
 
   freeaddrinfo(prc->addrinfo);
@@ -144,6 +156,8 @@
 static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct addrinfo *ai, int gai_error) {
   pn_proactor_t *p = prc->task.proactor;
   bool notify = false;
+
+  prc->name_lookup_pending = false;
   if (gai_error) {
     psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
   } else if (ai) {
@@ -224,21 +238,29 @@
   return &conn->raw_connection;
 }
 
-// Call from pconnection_process with task lock held.
-// Return true if the socket is connecting and there are no Proton events to deliver.
-static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
-  const char *host;
-  const char *port;
+// Call from pconnection_process with no locks.
+// Callback may complete before pni_name_lookup_start returns.
+static void praw_connection_first_connect(praw_connection_t *prc) {
   pn_proactor_t *p = prc->task.proactor;
-
-  unlock(&prc->task.mutex);
   size_t addrlen = strlen(prc->taddr);
   char *addr_buf = (char*) alloca(addrlen+1);
+  const char *host;
+  const char *port;
   pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
   bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc, raw_connection_done_cb);
-  lock(&prc->task.mutex);
-
-  return rc;
+  if (!rc) {
+    // Either the callback was synchronous or no callback was possible
+    bool notify = false;
+    lock(&prc->task.mutex);
+    if (prc->name_lookup_pending) {
+      // Clean up since there will be no callback.
+      prc->name_lookup_pending = false;
+      psocket_error(prc, EAI_FAIL, "internal error on connect");
+      notify = schedule(&prc->task);
+    }
+    unlock(&prc->task.mutex);
+    if (notify) notify_poller(p);
+  }
 }
 
 void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
@@ -438,11 +460,17 @@
     rc->armed = false;
     rc->current_arm = 0;
   }
-  if (pni_raw_finished(&rc->raw_connection)) {
+  if (pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending) {
+    t->working = false;
     unlock(&rc->task.mutex);
     praw_initiate_cleanup(rc);
     return NULL;
   }
+  if (rc->task.closing) {
+    // rclosed and wclosed.  Allow final events to be processed.
+    unlock(&rc->task.mutex);
+    return &rc->batch;
+  }
   int events = io_events;
   int fd = rc->psocket.epoll_io.fd;
 
@@ -450,10 +478,19 @@
     rc->first_schedule = false;
     assert(!events); // No socket yet.
     assert(!rc->connected);
-    if (praw_connection_first_connect_lh(rc)) {
+    bool wake_event = pni_task_wake_pending(&rc->task);
+
+    t->working = false;
+    rc->name_lookup_pending = true;
+    unlock(&rc->task.mutex);
+    praw_connection_first_connect(rc);
+    if (wake_event) {
+      lock(&rc->task.mutex);
+      t->working = true;
       unlock(&rc->task.mutex);
-      return NULL;
+      return &rc->batch;
     }
+    return NULL;
   }
   if (!rc->connected) {
     if (events & (EPOLLHUP | EPOLLERR)) {
@@ -525,9 +562,10 @@
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
   notify = (wake_pending || have_event) && schedule(&rc->task);
   ready = rc->task.ready;  // No need to poll.  Already scheduled.
+  bool praw_finished = pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending;
   unlock(&rc->task.mutex);
 
-  if (pni_raw_finished(raw) && !ready) {
+  if (praw_finished && !ready) {
     // If raw connection has no more work to do and safe to free resources, do so.
     praw_initiate_cleanup(rc);
   } else if (ready) {