| diff -rupN orig/src/rdkafka_broker.h patched/src/rdkafka_broker.h |
| --- orig/src/rdkafka_broker.h 2021-01-25 23:25:19.000000000 +0100 |
| +++ patched/src/rdkafka_broker.h 2021-02-22 16:44:21.665943600 +0100 |
| @@ -317,8 +317,17 @@ struct rd_kafka_broker_s { /* rd_kafka_b |
| rd_kafka_resp_err_t err; /**< Last error code */ |
| int cnt; /**< Number of identical errors */ |
| } rkb_last_err; |
| + |
| + /** Recovery actions that need to be performed*/ |
| + uint32_t rkb_recovery_actions; |
| }; |
| |
| +/* |
| + * Recovery actions bit flag |
| + */ |
| +#define RKB_RECOVERY_ACTIONS_NONE 0x0000 |
| +#define RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD 0x0001 |
| + |
| #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) |
| #define rd_kafka_broker_keep_fl(FUNC,LINE,RKB) \ |
| rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt) |
| @@ -450,6 +459,7 @@ rd_kafka_broker_controller_async (rd_kaf |
| |
| int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist); |
| void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state); |
| +void rd_kafka_broker_set_recovery_action (rd_kafka_broker_t* rkbs, uint32_t action); |
| |
| void rd_kafka_broker_fail (rd_kafka_broker_t *rkb, |
| int level, rd_kafka_resp_err_t err, |
| diff -rupN orig/src/rdkafka_broker.c patched/src/rdkafka_broker.c |
| --- orig/src/rdkafka_broker.c 2021-01-25 23:25:19.000000000 +0100 |
| +++ patched/src/rdkafka_broker.c 2021-02-22 16:43:33.391455200 +0100 |
| @@ -272,6 +272,74 @@ int16_t rd_kafka_broker_ApiVersion_suppo |
| return maxver; |
| } |
| |
| +/** |
| + * @brief Setup the wake fd for IO events |
| + * |
| + * @locality broker creation or reconnection |
| + * @locks none |
| + */ |
| +static void rd_kafka_broker_setup_queue_wakeup_fd(rd_kafka_broker_t* rkb) { |
| + int r; |
| + |
| + /* |
| + * Fd-based queue wake-ups using a non-blocking pipe. |
| + * Writes are best effort, if the socket queue is full |
| + * the write fails (silently) but this has no effect on latency |
| + * since the POLLIN flag will already have been raised for fd. |
| + */ |
| + rkb->rkb_wakeup_fd[0] = -1; |
| + rkb->rkb_wakeup_fd[1] = -1; |
| + if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) { |
| + rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD", |
| + "Failed to setup broker queue wake-up fds: " |
| + "%s: disabling low-latency mode", |
| + rd_strerror(r)); |
| + |
| + } |
| + else if (rkb->rkb_source == RD_KAFKA_INTERNAL) { |
| + /* nop: internal broker has no IO transport. */ |
| + |
| + } |
| + else { |
| + char onebyte = 1; |
| + |
| + rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD", |
| + "Enabled low-latency ops queue wake-ups"); |
| + rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1], |
| + &onebyte, sizeof(onebyte)); |
| + } |
| +} |
| + |
| +/** |
| + * @brief Set broker recovery action |
| + * |
| + * @locality any |
| + * @locks none |
| + */ |
| +void rd_kafka_broker_set_recovery_action(rd_kafka_broker_t* rkbs, uint32_t action) { |
| + rkbs->rkb_recovery_actions |= action; |
| +} |
| + |
| +/** |
| + * @brief Reinitialize queue wake up fd's |
| + * |
| + * @locality any |
| + * @locks none |
| + */ |
| +static void rd_kafka_broker_reinitialize_wake_up_fd(rd_kafka_broker_t* rkb) { |
| + if ((rkb->rkb_recovery_actions & RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD) != 0) { |
| + rd_rkb_log(rkb, LOG_WARNING, "WAKEUPFD", |
| + "Reinitializing the wakeup fd's"); |
| + |
| + if (rkb->rkb_wakeup_fd[0] != -1) |
| + rd_close(rkb->rkb_wakeup_fd[0]); |
| + if (rkb->rkb_wakeup_fd[1] != -1) |
| + rd_close(rkb->rkb_wakeup_fd[1]); |
| + |
| + rd_kafka_broker_setup_queue_wakeup_fd(rkb); |
| + rkb->rkb_recovery_actions &= ~RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD; |
| + } |
| +} |
| |
| /** |
| * @brief Set broker state. |
| @@ -5174,6 +5242,8 @@ static int rd_kafka_broker_thread_main ( |
| continue; |
| } |
| |
| + rd_kafka_broker_reinitialize_wake_up_fd(rkb); |
| + |
| /* Initiate asynchronous connection attempt. |
| * Only the host lookup is blocking here. */ |
| r = rd_kafka_broker_connect(rkb); |
| @@ -5412,6 +5482,7 @@ rd_kafka_broker_t *rd_kafka_broker_add ( |
| rkb->rkb_proto = proto; |
| rkb->rkb_port = port; |
| rkb->rkb_origname = rd_strdup(name); |
| + rkb->rkb_recovery_actions = RKB_RECOVERY_ACTIONS_NONE; |
| |
| mtx_init(&rkb->rkb_lock, mtx_plain); |
| mtx_init(&rkb->rkb_logname_lock, mtx_plain); |
| @@ -5467,33 +5538,7 @@ rd_kafka_broker_t *rd_kafka_broker_add ( |
| pthread_sigmask(SIG_SETMASK, &newset, &oldset); |
| #endif |
| |
| - /* |
| - * Fd-based queue wake-ups using a non-blocking pipe. |
| - * Writes are best effort, if the socket queue is full |
| - * the write fails (silently) but this has no effect on latency |
| - * since the POLLIN flag will already have been raised for fd. |
| - */ |
| - rkb->rkb_wakeup_fd[0] = -1; |
| - rkb->rkb_wakeup_fd[1] = -1; |
| - rkb->rkb_toppar_wakeup_fd = -1; |
| - |
| - if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) { |
| - rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD", |
| - "Failed to setup broker queue wake-up fds: " |
| - "%s: disabling low-latency mode", |
| - rd_strerror(r)); |
| - |
| - } else if (source == RD_KAFKA_INTERNAL) { |
| - /* nop: internal broker has no IO transport. */ |
| - |
| - } else { |
| - char onebyte = 1; |
| - |
| - rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD", |
| - "Enabled low-latency ops queue wake-ups"); |
| - rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1], |
| - &onebyte, sizeof(onebyte)); |
| - } |
| + rd_kafka_broker_setup_queue_wakeup_fd(rkb); |
| |
| /* Lock broker's lock here to synchronise state, i.e., hold off |
| * the broker thread until we've finalized the rkb. */ |
| diff -rupN orig/src/rdkafka_transport.c patched/src/rdkafka_transport.c |
| --- orig/src/rdkafka_transport.c 2021-01-25 23:25:19.000000000 +0100 |
| +++ patched/src/rdkafka_transport.c 2021-02-22 16:45:02.884392000 +0100 |
| @@ -987,6 +987,22 @@ int rd_kafka_transport_poll(rd_kafka_tra |
| return 0; |
| } else if (r == RD_SOCKET_ERROR) |
| return -1; |
| + |
| + /* In rare cases the local socket used for wake on IO could be |
| + * disconnected and this will lead the WSAPoll to return immediately |
| + * causing high CPU usage. To fix this set a broker recovery action flag |
| + * to reinitialize the local io socket while also rebuildng the transport. |
| + * Issue #3139 */ |
| + if (rktrans->rktrans_pfd[1].revents & POLLERR || rktrans->rktrans_pfd[1].revents & POLLHUP) { |
| + char errstr[512]; |
| + rd_snprintf(errstr, sizeof(errstr), |
| + "Internal IO event socket disconnected for broker: %s, revents %d", |
| + rktrans->rktrans_rkb->rkb_name, rktrans->rktrans_pfd[1].revents); |
| + rd_kafka_broker_set_recovery_action(rktrans->rktrans_rkb, |
| + RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD); |
| + rd_kafka_transport_connect_done(rktrans, errstr); |
| + return -1; |
| + } |
| #endif |
| rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1); |
| |