PROTON-2931: epoll proactor thread races using async c-ares name resolver library
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 7ae86f1..967464e 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h
@@ -271,6 +271,7 @@ pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ bool io_doublecheck; /* callbacks made and new IO may have arrived */ uint64_t expected_timeout; + bool name_lookup_pending; char addr_buf[1]; } pconnection_t;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index df1bde8..0479c57 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c
@@ -836,7 +836,7 @@ // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer. // Return true when all possible outstanding epoll events associated with this pconnection have been processed. static inline bool pconnection_is_final(pconnection_t *pc) { - return !pc->current_arm && !pc->task.ready && !pc->tick_pending; + return !pc->current_arm && !pc->task.ready && !pc->tick_pending && !pc->name_lookup_pending; } static void pconnection_final_free(pconnection_t *pc) { @@ -1400,6 +1400,7 @@ } /* Called on initial connect, and if connection fails to try another address */ +/* May be called within the pconnection task or from an external name_lookup task */ static void pconnection_maybe_connect_lh(pconnection_t *pc) { errno = 0; if (!pc->connected) { /* Not yet connected */ @@ -1445,8 +1446,7 @@ /* Called when connection name lookup completes (from name_lookup done_cb). Call with task lock held. */ static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, int gai_error) { - pn_proactor_t *p = pc->task.proactor; - bool notify = false; + pc->name_lookup_pending = false; if (gai_error) { psocket_gai_error(&pc->psocket, gai_error, "connect to "); } else if (ai) { @@ -1457,8 +1457,8 @@ return; } } - notify = schedule(&pc->task); - if (notify) notify_poller(p); + bool notify = schedule(&pc->task); + if (notify) notify_poller(pc->task.proactor); } static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) { @@ -1472,10 +1472,28 @@ // Return true if the socket is connecting and there are no Proton events to deliver. static bool pconnection_first_connect_lh(pconnection_t *pc) { pn_proactor_t *p = pc->task.proactor; + pn_transport_t *tp = pc->driver.transport; + pc->name_lookup_pending = true; + unlock(&pc->task.mutex); bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, connection_done_cb); lock(&pc->task.mutex); - return rc; + + if (!rc) { + // Either the callback was synchronous or no callback was possible + if (pc->name_lookup_pending) { + // Clean up since there will be no callback. + pc->name_lookup_pending = false; + psocket_error(&pc->psocket, EAI_FAIL, "internal error on connect"); + } + return false; + } + // Name lookup started. Callback may have already completed and failed. + if (!pc->name_lookup_pending) { + if (pn_condition_is_set(pn_transport_condition(tp))) + return false; + } + return true; } void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 1f056c8..b073bee 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c
@@ -56,6 +56,7 @@ bool hup_detected; bool read_check; bool first_schedule; + bool name_lookup_pending; char *taddr; }; @@ -110,8 +111,14 @@ } /* Called on initial connect, and if connection fails to try another address */ +/* May be called within the praw_connection task or from an external name_lookup task */ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) { + if (prc->task.closing) { + return; + } + int err = 0; /* Initialized in case while loop has zero iterations */ while (prc->ai) { /* Have an address */ + err = 0; struct addrinfo *ai = prc->ai; prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */ int fd = socket(ai->ai_family, SOCK_STREAM, 0); @@ -125,14 +132,19 @@ praw_connection_start(prc, fd); return; /* Async connection started */ } else { + err = errno; close(fd); } + } else { + err = errno; } /* connect failed immediately, go round the loop to try the next addr */ } - int err; - socklen_t errlen = sizeof(err); - getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); + + if (err == 0 && prc->psocket.epoll_io.fd >= 0) { + socklen_t errlen = sizeof(err); + getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); + } psocket_error(prc, err, "on connect"); freeaddrinfo(prc->addrinfo); @@ -144,6 +156,8 @@ static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct addrinfo *ai, int gai_error) { pn_proactor_t *p = prc->task.proactor; bool notify = false; + + prc->name_lookup_pending = false; if (gai_error) { psocket_gai_error(prc, gai_error, "connect to ", prc->taddr); } else if (ai) { @@ -224,21 +238,29 @@ return &conn->raw_connection; } -// Call from pconnection_process with task lock held. -// Return true if the socket is connecting and there are no Proton events to deliver. -static bool praw_connection_first_connect_lh(praw_connection_t *prc) { - const char *host; - const char *port; +// Call from pconnection_process with no locks. +// Callback may complete before pni_name_lookup_start returns. +static void praw_connection_first_connect(praw_connection_t *prc) { pn_proactor_t *p = prc->task.proactor; - - unlock(&prc->task.mutex); size_t addrlen = strlen(prc->taddr); char *addr_buf = (char*) alloca(addrlen+1); + const char *host; + const char *port; pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port); bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc, raw_connection_done_cb); - lock(&prc->task.mutex); - - return rc; + if (!rc) { + // Either the callback was synchronous or no callback was possible + bool notify = false; + lock(&prc->task.mutex); + if (prc->name_lookup_pending) { + // Clean up since there will be no callback. + prc->name_lookup_pending = false; + psocket_error(prc, EAI_FAIL, "internal error on connect"); + notify = schedule(&prc->task); + } + unlock(&prc->task.mutex); + if (notify) notify_poller(p); + } } void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) { @@ -438,11 +460,17 @@ rc->armed = false; rc->current_arm = 0; } - if (pni_raw_finished(&rc->raw_connection)) { + if (pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending) { + t->working = false; unlock(&rc->task.mutex); praw_initiate_cleanup(rc); return NULL; } + if (rc->task.closing) { + // rclosed and wclosed. Allow final events to be processed. + unlock(&rc->task.mutex); + return &rc->batch; + } int events = io_events; int fd = rc->psocket.epoll_io.fd; @@ -450,10 +478,19 @@ rc->first_schedule = false; assert(!events); // No socket yet. assert(!rc->connected); - if (praw_connection_first_connect_lh(rc)) { + bool wake_event = pni_task_wake_pending(&rc->task); + + t->working = false; + rc->name_lookup_pending = true; + unlock(&rc->task.mutex); + praw_connection_first_connect(rc); + if (wake_event) { + lock(&rc->task.mutex); + t->working = true; unlock(&rc->task.mutex); - return NULL; + return &rc->batch; } + return NULL; } if (!rc->connected) { if (events & (EPOLLHUP | EPOLLERR)) { @@ -525,9 +562,10 @@ // wakes outstanding because we dealt with it already in pni_raw_batch_next() notify = (wake_pending || have_event) && schedule(&rc->task); ready = rc->task.ready; // No need to poll. Already scheduled. + bool praw_finished = pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending; unlock(&rc->task.mutex); - if (pni_raw_finished(raw) && !ready) { + if (praw_finished && !ready) { // If raw connection has no more work to do and safe to free resources, do so. praw_initiate_cleanup(rc); } else if (ready) {