blob: 78099ac86c59a63a3a44122a9a64401b1fc656f5 [file] [log] [blame]
diff -rupN orig/src/rdkafka_broker.h patched/src/rdkafka_broker.h
--- orig/src/rdkafka_broker.h 2021-01-25 23:25:19.000000000 +0100
+++ patched/src/rdkafka_broker.h 2021-02-22 16:44:21.665943600 +0100
@@ -317,8 +317,17 @@ struct rd_kafka_broker_s { /* rd_kafka_b
rd_kafka_resp_err_t err; /**< Last error code */
int cnt; /**< Number of identical errors */
} rkb_last_err;
+
+ /** Recovery actions that need to be performed*/
+ uint32_t rkb_recovery_actions;
};
+/*
+ * Recovery actions bit flag
+ */
+#define RKB_RECOVERY_ACTIONS_NONE 0x0000
+#define RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD 0x0001
+
#define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
#define rd_kafka_broker_keep_fl(FUNC,LINE,RKB) \
rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt)
@@ -450,6 +459,7 @@ rd_kafka_broker_controller_async (rd_kaf
int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
+void rd_kafka_broker_set_recovery_action (rd_kafka_broker_t* rkbs, uint32_t action);
void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
int level, rd_kafka_resp_err_t err,
diff -rupN orig/src/rdkafka_broker.c patched/src/rdkafka_broker.c
--- orig/src/rdkafka_broker.c 2021-01-25 23:25:19.000000000 +0100
+++ patched/src/rdkafka_broker.c 2021-02-22 16:43:33.391455200 +0100
@@ -272,6 +272,74 @@ int16_t rd_kafka_broker_ApiVersion_suppo
return maxver;
}
+/**
+ * @brief Setup the wake fd for IO events
+ *
+ * @locality broker creation or reconnection
+ * @locks none
+ */
+static void rd_kafka_broker_setup_queue_wakeup_fd(rd_kafka_broker_t* rkb) {
+ int r;
+
+ /*
+ * Fd-based queue wake-ups using a non-blocking pipe.
+ * Writes are best effort, if the socket queue is full
+ * the write fails (silently) but this has no effect on latency
+ * since the POLLIN flag will already have been raised for fd.
+ */
+ rkb->rkb_wakeup_fd[0] = -1;
+ rkb->rkb_wakeup_fd[1] = -1;
+ if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
+ rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
+ "Failed to setup broker queue wake-up fds: "
+ "%s: disabling low-latency mode",
+ rd_strerror(r));
+
+ }
+ else if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
+ /* nop: internal broker has no IO transport. */
+
+ }
+ else {
+ char onebyte = 1;
+
+ rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
+ "Enabled low-latency ops queue wake-ups");
+ rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
+ &onebyte, sizeof(onebyte));
+ }
+}
+
+/**
+ * @brief Set broker recovery action
+ *
+ * @locality any
+ * @locks none
+ */
+void rd_kafka_broker_set_recovery_action(rd_kafka_broker_t* rkbs, uint32_t action) {
+ rkbs->rkb_recovery_actions |= action;
+}
+
+/**
+ * @brief Reinitialize queue wake up fd's
+ *
+ * @locality any
+ * @locks none
+ */
+static void rd_kafka_broker_reinitialize_wake_up_fd(rd_kafka_broker_t* rkb) {
+ if ((rkb->rkb_recovery_actions & RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD) != 0) {
+ rd_rkb_log(rkb, LOG_WARNING, "WAKEUPFD",
+ "Reinitializing the wakeup fd's");
+
+ if (rkb->rkb_wakeup_fd[0] != -1)
+ rd_close(rkb->rkb_wakeup_fd[0]);
+ if (rkb->rkb_wakeup_fd[1] != -1)
+ rd_close(rkb->rkb_wakeup_fd[1]);
+
+ rd_kafka_broker_setup_queue_wakeup_fd(rkb);
+ rkb->rkb_recovery_actions &= ~RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD;
+ }
+}
/**
* @brief Set broker state.
@@ -5174,6 +5242,8 @@ static int rd_kafka_broker_thread_main (
continue;
}
+ rd_kafka_broker_reinitialize_wake_up_fd(rkb);
+
/* Initiate asynchronous connection attempt.
* Only the host lookup is blocking here. */
r = rd_kafka_broker_connect(rkb);
@@ -5412,6 +5482,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (
rkb->rkb_proto = proto;
rkb->rkb_port = port;
rkb->rkb_origname = rd_strdup(name);
+ rkb->rkb_recovery_actions = RKB_RECOVERY_ACTIONS_NONE;
mtx_init(&rkb->rkb_lock, mtx_plain);
mtx_init(&rkb->rkb_logname_lock, mtx_plain);
@@ -5467,33 +5538,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (
pthread_sigmask(SIG_SETMASK, &newset, &oldset);
#endif
- /*
- * Fd-based queue wake-ups using a non-blocking pipe.
- * Writes are best effort, if the socket queue is full
- * the write fails (silently) but this has no effect on latency
- * since the POLLIN flag will already have been raised for fd.
- */
- rkb->rkb_wakeup_fd[0] = -1;
- rkb->rkb_wakeup_fd[1] = -1;
- rkb->rkb_toppar_wakeup_fd = -1;
-
- if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
- rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
- "Failed to setup broker queue wake-up fds: "
- "%s: disabling low-latency mode",
- rd_strerror(r));
-
- } else if (source == RD_KAFKA_INTERNAL) {
- /* nop: internal broker has no IO transport. */
-
- } else {
- char onebyte = 1;
-
- rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
- "Enabled low-latency ops queue wake-ups");
- rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
- &onebyte, sizeof(onebyte));
- }
+ rd_kafka_broker_setup_queue_wakeup_fd(rkb);
/* Lock broker's lock here to synchronise state, i.e., hold off
* the broker thread until we've finalized the rkb. */
diff -rupN orig/src/rdkafka_transport.c patched/src/rdkafka_transport.c
--- orig/src/rdkafka_transport.c 2021-01-25 23:25:19.000000000 +0100
+++ patched/src/rdkafka_transport.c 2021-02-22 16:45:02.884392000 +0100
@@ -987,6 +987,22 @@ int rd_kafka_transport_poll(rd_kafka_tra
return 0;
} else if (r == RD_SOCKET_ERROR)
return -1;
+
+ /* In rare cases the local socket used for wake on IO could be
+ * disconnected and this will lead the WSAPoll to return immediately
+ * causing high CPU usage. To fix this set a broker recovery action flag
+ * to reinitialize the local io socket while also rebuildng the transport.
+ * Issue #3139 */
+ if (rktrans->rktrans_pfd[1].revents & POLLERR || rktrans->rktrans_pfd[1].revents & POLLHUP) {
+ char errstr[512];
+ rd_snprintf(errstr, sizeof(errstr),
+ "Internal IO event socket disconnected for broker: %s, revents %d",
+ rktrans->rktrans_rkb->rkb_name, rktrans->rktrans_pfd[1].revents);
+ rd_kafka_broker_set_recovery_action(rktrans->rktrans_rkb,
+ RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD);
+ rd_kafka_transport_connect_done(rktrans, errstr);
+ return -1;
+ }
#endif
rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);