| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012-2015, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| |
| |
| #ifndef _MSC_VER |
| #define _GNU_SOURCE |
| /* |
| * AIX defines this and the value needs to be set correctly. For Solaris, |
| * src/rd.h defines _POSIX_SOURCE to be 200809L, which corresponds to XPG7, |
| * which itself is not compatible with _XOPEN_SOURCE on that platform. |
| */ |
| #if !defined(_AIX) && !defined(__sun) |
| #define _XOPEN_SOURCE |
| #endif |
| #include <signal.h> |
| #endif |
| |
| #include <stdio.h> |
| #include <stdarg.h> |
| #include <string.h> |
| #include <ctype.h> |
| |
| #include "rd.h" |
| #include "rdkafka_int.h" |
| #include "rdkafka_msg.h" |
| #include "rdkafka_msgset.h" |
| #include "rdkafka_topic.h" |
| #include "rdkafka_partition.h" |
| #include "rdkafka_broker.h" |
| #include "rdkafka_offset.h" |
| #include "rdkafka_transport.h" |
| #include "rdkafka_proto.h" |
| #include "rdkafka_buf.h" |
| #include "rdkafka_request.h" |
| #include "rdkafka_sasl.h" |
| #include "rdkafka_interceptor.h" |
| #include "rdtime.h" |
| #include "rdcrc32.h" |
| #include "rdrand.h" |
| #include "rdkafka_lz4.h" |
| #if WITH_SSL |
| #include <openssl/err.h> |
| #endif |
| #include "rdendian.h" |
| |
| |
| const char *rd_kafka_broker_state_names[] = { |
| "INIT", |
| "DOWN", |
| "CONNECT", |
| "AUTH", |
| "UP", |
| "UPDATE", |
| "APIVERSION_QUERY", |
| "AUTH_HANDSHAKE" |
| }; |
| |
| const char *rd_kafka_secproto_names[] = { |
| [RD_KAFKA_PROTO_PLAINTEXT] = "plaintext", |
| [RD_KAFKA_PROTO_SSL] = "ssl", |
| [RD_KAFKA_PROTO_SASL_PLAINTEXT] = "sasl_plaintext", |
| [RD_KAFKA_PROTO_SASL_SSL] = "sasl_ssl", |
| NULL |
| }; |
| |
| |
| |
| |
| |
| |
| |
| #define rd_kafka_broker_terminating(rkb) \ |
| (rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1) |
| |
| |
| /** |
| * Construct broker nodename. |
| */ |
| static void rd_kafka_mk_nodename (char *dest, size_t dsize, |
| const char *name, uint16_t port) { |
| rd_snprintf(dest, dsize, "%s:%hu", name, port); |
| } |
| |
| /** |
| * Construct descriptive broker name |
| */ |
| static void rd_kafka_mk_brokername (char *dest, size_t dsize, |
| rd_kafka_secproto_t proto, |
| const char *nodename, int32_t nodeid, |
| rd_kafka_confsource_t source) { |
| |
| /* Prepend protocol name to brokername, unless it is a |
| * standard plaintext broker in which case we omit the protocol part. */ |
| if (proto != RD_KAFKA_PROTO_PLAINTEXT) { |
| int r = rd_snprintf(dest, dsize, "%s://", |
| rd_kafka_secproto_names[proto]); |
| if (r >= (int)dsize) /* Skip proto name if it wont fit.. */ |
| r = 0; |
| |
| dest += r; |
| dsize -= r; |
| } |
| |
| if (nodeid == RD_KAFKA_NODEID_UA) |
| rd_snprintf(dest, dsize, "%s/%s", |
| nodename, |
| source == RD_KAFKA_INTERNAL ? |
| "internal":"bootstrap"); |
| else |
| rd_snprintf(dest, dsize, "%s/%"PRId32, nodename, nodeid); |
| } |
| |
| |
| /** |
| * @brief Enable protocol feature(s) for the current broker. |
| * |
| * Locality: broker thread |
| */ |
| static void rd_kafka_broker_feature_enable (rd_kafka_broker_t *rkb, |
| int features) { |
| if (features & rkb->rkb_features) |
| return; |
| |
| rkb->rkb_features |= features; |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE, |
| "FEATURE", |
| "Updated enabled protocol features +%s to %s", |
| rd_kafka_features2str(features), |
| rd_kafka_features2str(rkb->rkb_features)); |
| } |
| |
| |
| /** |
| * @brief Disable protocol feature(s) for the current broker. |
| * |
| * Locality: broker thread |
| */ |
| static void rd_kafka_broker_feature_disable (rd_kafka_broker_t *rkb, |
| int features) { |
| if (!(features & rkb->rkb_features)) |
| return; |
| |
| rkb->rkb_features &= ~features; |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FEATURE, |
| "FEATURE", |
| "Updated enabled protocol features -%s to %s", |
| rd_kafka_features2str(features), |
| rd_kafka_features2str(rkb->rkb_features)); |
| } |
| |
| |
| /** |
| * @brief Set protocol feature(s) for the current broker. |
| * |
| * @remark This replaces the previous feature set. |
| * |
| * @locality broker thread |
| * @locks rd_kafka_broker_lock() |
| */ |
| static void rd_kafka_broker_features_set (rd_kafka_broker_t *rkb, int features) { |
| if (rkb->rkb_features == features) |
| return; |
| |
| rkb->rkb_features = features; |
| rd_rkb_dbg(rkb, BROKER, "FEATURE", |
| "Updated enabled protocol features to %s", |
| rd_kafka_features2str(rkb->rkb_features)); |
| } |
| |
| |
| /** |
| * @brief Check and return supported ApiVersion for \p ApiKey. |
| * |
| * @returns the highest supported ApiVersion in the specified range (inclusive) |
| * or -1 if the ApiKey is not supported or no matching ApiVersion. |
| * The current feature set is also returned in \p featuresp |
| * @locks none |
| * @locality any |
| */ |
| int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb, |
| int16_t ApiKey, |
| int16_t minver, int16_t maxver, |
| int *featuresp) { |
| struct rd_kafka_ApiVersion skel = { .ApiKey = ApiKey }; |
| struct rd_kafka_ApiVersion ret = RD_ZERO_INIT, *retp; |
| |
| rd_kafka_broker_lock(rkb); |
| retp = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt, |
| sizeof(*rkb->rkb_ApiVersions), |
| rd_kafka_ApiVersion_key_cmp); |
| if (retp) |
| ret = *retp; |
| if (featuresp) |
| *featuresp = rkb->rkb_features; |
| rd_kafka_broker_unlock(rkb); |
| |
| if (!retp) |
| return -1; |
| |
| if (ret.MaxVer < maxver) { |
| if (ret.MaxVer < minver) |
| return -1; |
| else |
| return ret.MaxVer; |
| } else if (ret.MinVer > maxver) |
| return -1; |
| else |
| return maxver; |
| } |
| |
| |
| /** |
| * Locks: rd_kafka_broker_lock() MUST be held. |
| * Locality: broker thread |
| */ |
| void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state) { |
| if ((int)rkb->rkb_state == state) |
| return; |
| |
| rd_kafka_dbg(rkb->rkb_rk, BROKER, "STATE", |
| "%s: Broker changed state %s -> %s", |
| rkb->rkb_name, |
| rd_kafka_broker_state_names[rkb->rkb_state], |
| rd_kafka_broker_state_names[state]); |
| |
| if (rkb->rkb_source == RD_KAFKA_INTERNAL) { |
| /* no-op */ |
| } else if (state == RD_KAFKA_BROKER_STATE_DOWN && |
| !rkb->rkb_down_reported && |
| rkb->rkb_state != RD_KAFKA_BROKER_STATE_APIVERSION_QUERY) { |
| /* Propagate ALL_BROKERS_DOWN event if all brokers are |
| * now down, unless we're terminating. |
| * Dont do this if we're querying for ApiVersion since it |
| * is bound to fail once on older brokers. */ |
| if (rd_atomic32_add(&rkb->rkb_rk->rk_broker_down_cnt, 1) == |
| rd_atomic32_get(&rkb->rkb_rk->rk_broker_cnt) && |
| !rd_atomic32_get(&rkb->rkb_rk->rk_terminate)) |
| rd_kafka_op_err(rkb->rkb_rk, |
| RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN, |
| "%i/%i brokers are down", |
| rd_atomic32_get(&rkb->rkb_rk-> |
| rk_broker_down_cnt), |
| rd_atomic32_get(&rkb->rkb_rk-> |
| rk_broker_cnt)); |
| rkb->rkb_down_reported = 1; |
| |
| } else if (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP && |
| rkb->rkb_down_reported) { |
| rd_atomic32_sub(&rkb->rkb_rk->rk_broker_down_cnt, 1); |
| rkb->rkb_down_reported = 0; |
| } |
| |
| rkb->rkb_state = state; |
| rkb->rkb_ts_state = rd_clock(); |
| |
| rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); |
| } |
| |
| |
| /** |
| * @brief Locks broker, acquires the states, unlocks, and returns |
| * the state. |
| * @locks !broker_lock |
| * @locality any |
| */ |
| int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb) { |
| int state; |
| rd_kafka_broker_lock(rkb); |
| state = rkb->rkb_state; |
| rd_kafka_broker_unlock(rkb); |
| return state; |
| } |
| |
| |
| /** |
| * Failure propagation to application. |
| * Will tear down connection to broker and trigger a reconnect. |
| * |
| * If 'fmt' is NULL nothing will be logged or propagated to the application. |
| * |
| * \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will |
| * be debug-logged. |
| * |
| * Locality: Broker thread |
| */ |
| void rd_kafka_broker_fail (rd_kafka_broker_t *rkb, |
| int level, rd_kafka_resp_err_t err, |
| const char *fmt, ...) { |
| va_list ap; |
| int errno_save = errno; |
| rd_kafka_bufq_t tmpq_waitresp, tmpq; |
| int old_state; |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| rd_kafka_dbg(rkb->rkb_rk, BROKER | RD_KAFKA_DBG_PROTOCOL, "BROKERFAIL", |
| "%s: failed: err: %s: (errno: %s)", |
| rkb->rkb_name, rd_kafka_err2str(err), |
| rd_strerror(errno_save)); |
| |
| rkb->rkb_err.err = errno_save; |
| |
| if (rkb->rkb_transport) { |
| rd_kafka_transport_close(rkb->rkb_transport); |
| rkb->rkb_transport = NULL; |
| } |
| |
| rkb->rkb_req_timeouts = 0; |
| |
| if (rkb->rkb_recv_buf) { |
| rd_kafka_buf_destroy(rkb->rkb_recv_buf); |
| rkb->rkb_recv_buf = NULL; |
| } |
| |
| rd_kafka_broker_lock(rkb); |
| |
| /* The caller may omit the format if it thinks this is a recurring |
| * failure, in which case the following things are omitted: |
| * - log message |
| * - application OP_ERR |
| * - metadata request |
| * |
| * Dont log anything if this was the termination signal, or if the |
| * socket disconnected while trying ApiVersionRequest. |
| */ |
| if (fmt && |
| !(errno_save == EINTR && |
| rd_atomic32_get(&rkb->rkb_rk->rk_terminate)) && |
| !(err == RD_KAFKA_RESP_ERR__TRANSPORT && |
| rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY)) { |
| int of; |
| |
| /* Insert broker name in log message if it fits. */ |
| of = rd_snprintf(rkb->rkb_err.msg, sizeof(rkb->rkb_err.msg), |
| "%s: ", rkb->rkb_name); |
| if (of >= (int)sizeof(rkb->rkb_err.msg)) |
| of = 0; |
| va_start(ap, fmt); |
| rd_vsnprintf(rkb->rkb_err.msg+of, |
| sizeof(rkb->rkb_err.msg)-of, fmt, ap); |
| va_end(ap); |
| |
| if (level >= LOG_DEBUG) |
| rd_kafka_dbg(rkb->rkb_rk, BROKER, "FAIL", |
| "%s", rkb->rkb_err.msg); |
| else { |
| /* Don't log if an error callback is registered */ |
| if (!rkb->rkb_rk->rk_conf.error_cb) |
| rd_kafka_log(rkb->rkb_rk, level, "FAIL", |
| "%s", rkb->rkb_err.msg); |
| /* Send ERR op back to application for processing. */ |
| rd_kafka_op_err(rkb->rkb_rk, err, |
| "%s", rkb->rkb_err.msg); |
| } |
| } |
| |
| /* If we're currently asking for ApiVersion and the connection |
| * went down it probably means the broker does not support that request |
| * and tore down the connection. In this case we disable that feature flag. */ |
| if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_APIVERSION_QUERY) |
| rd_kafka_broker_feature_disable(rkb, RD_KAFKA_FEATURE_APIVERSION); |
| |
| /* Set broker state */ |
| old_state = rkb->rkb_state; |
| rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN); |
| |
| /* Unlock broker since a requeue will try to lock it. */ |
| rd_kafka_broker_unlock(rkb); |
| |
| /* |
| * Purge all buffers |
| * (put bufs on a temporary queue since bufs may be requeued, |
| * make sure outstanding requests are re-enqueued before |
| * bufs on outbufs queue.) |
| */ |
| rd_kafka_bufq_init(&tmpq_waitresp); |
| rd_kafka_bufq_init(&tmpq); |
| rd_kafka_bufq_concat(&tmpq_waitresp, &rkb->rkb_waitresps); |
| rd_kafka_bufq_concat(&tmpq, &rkb->rkb_outbufs); |
| rd_atomic32_init(&rkb->rkb_blocking_request_cnt, 0); |
| |
| /* Purge the buffers (might get re-enqueued in case of retries) */ |
| rd_kafka_bufq_purge(rkb, &tmpq_waitresp, err); |
| |
| /* Put the outbufs back on queue */ |
| rd_kafka_bufq_concat(&rkb->rkb_outbufs, &tmpq); |
| |
| /* Update bufq for connection reset: |
| * - Purge connection-setup requests from outbufs since they will be |
| * reissued on the next connect. |
| * - Reset any partially sent buffer's offset. |
| */ |
| rd_kafka_bufq_connection_reset(rkb, &rkb->rkb_outbufs); |
| |
| /* Extra debugging for tracking termination-hang issues: |
| * show what is keeping this broker from decommissioning. */ |
| if (rd_kafka_terminating(rkb->rkb_rk) && |
| !rd_kafka_broker_terminating(rkb)) { |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL, "BRKTERM", |
| "terminating: broker still has %d refcnt(s), " |
| "%"PRId32" buffer(s), %d partition(s)", |
| rd_refcnt_get(&rkb->rkb_refcnt), |
| rd_kafka_bufq_cnt(&rkb->rkb_outbufs), |
| rkb->rkb_toppar_cnt); |
| rd_kafka_bufq_dump(rkb, "BRKOUTBUFS", &rkb->rkb_outbufs); |
| #if ENABLE_SHAREDPTR_DEBUG |
| if (rd_refcnt_get(&rkb->rkb_refcnt) > 1) { |
| rd_rkb_dbg(rkb, BROKER, "BRKTERM", |
| "Dumping shared pointers: " |
| "this broker is %p", rkb); |
| rd_shared_ptrs_dump(); |
| } |
| #endif |
| } |
| |
| |
| /* Query for topic leaders to quickly pick up on failover. */ |
| if (fmt && err != RD_KAFKA_RESP_ERR__DESTROY && |
| old_state >= RD_KAFKA_BROKER_STATE_UP) |
| rd_kafka_metadata_refresh_known_topics(rkb->rkb_rk, NULL, |
| 1/*force*/, |
| "broker down"); |
| } |
| |
| |
| |
| |
| |
| /** |
| * Scan bufq for buffer timeouts, trigger buffer callback on timeout. |
| * |
| * If \p partial_cntp is non-NULL any partially sent buffers will increase |
| * the provided counter by 1. |
| * |
| * @returns the number of timed out buffers. |
| * |
| * @locality broker thread |
| */ |
| static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb, |
| int is_waitresp_q, |
| rd_kafka_bufq_t *rkbq, |
| int *partial_cntp, |
| rd_kafka_resp_err_t err, |
| rd_ts_t now) { |
| rd_kafka_buf_t *rkbuf, *tmp; |
| int cnt = 0; |
| |
| TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) { |
| |
| if (likely(now && rkbuf->rkbuf_ts_timeout > now)) |
| continue; |
| |
| if (partial_cntp && rd_slice_offset(&rkbuf->rkbuf_reader) > 0) |
| (*partial_cntp)++; |
| |
| /* Convert rkbuf_ts_sent to elapsed time since request */ |
| if (rkbuf->rkbuf_ts_sent) |
| rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent; |
| else |
| rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_enq; |
| |
| rd_kafka_bufq_deq(rkbq, rkbuf); |
| |
| if (is_waitresp_q && rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING |
| && rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0) |
| rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); |
| |
| rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf); |
| cnt++; |
| } |
| |
| return cnt; |
| } |
| |
| |
| /** |
| * Scan the wait-response and outbuf queues for message timeouts. |
| * |
| * Locality: Broker thread |
| */ |
| static void rd_kafka_broker_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) { |
| int req_cnt, retry_cnt, q_cnt; |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| /* Outstanding requests waiting for response */ |
| req_cnt = rd_kafka_broker_bufq_timeout_scan( |
| rkb, 1, &rkb->rkb_waitresps, NULL, |
| RD_KAFKA_RESP_ERR__TIMED_OUT, now); |
| /* Requests in retry queue */ |
| retry_cnt = rd_kafka_broker_bufq_timeout_scan( |
| rkb, 0, &rkb->rkb_retrybufs, NULL, |
| RD_KAFKA_RESP_ERR__TIMED_OUT, now); |
| /* Requests in local queue not sent yet. */ |
| q_cnt = rd_kafka_broker_bufq_timeout_scan( |
| rkb, 0, &rkb->rkb_outbufs, &req_cnt, |
| RD_KAFKA_RESP_ERR__TIMED_OUT, now); |
| |
| if (req_cnt + retry_cnt + q_cnt > 0) { |
| rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_BROKER, |
| "REQTMOUT", "Timed out %i+%i+%i requests", |
| req_cnt, retry_cnt, q_cnt); |
| |
| /* Fail the broker if socket.max.fails is configured and |
| * now exceeded. */ |
| rkb->rkb_req_timeouts += req_cnt + q_cnt; |
| rd_atomic64_add(&rkb->rkb_c.req_timeouts, req_cnt + q_cnt); |
| |
| /* If this was an in-flight request that timed out, or |
| * the other queues has reached the socket.max.fails threshold, |
| * we need to take down the connection. */ |
| if ((req_cnt > 0 || |
| (rkb->rkb_rk->rk_conf.socket_max_fails && |
| rkb->rkb_req_timeouts >= |
| rkb->rkb_rk->rk_conf.socket_max_fails)) && |
| rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP) { |
| char rttinfo[32]; |
| /* Print average RTT (if avail) to help diagnose. */ |
| rd_avg_calc(&rkb->rkb_avg_rtt, now); |
| if (rkb->rkb_avg_rtt.ra_v.avg) |
| rd_snprintf(rttinfo, sizeof(rttinfo), |
| " (average rtt %.3fms)", |
| (float)(rkb->rkb_avg_rtt.ra_v.avg/ |
| 1000.0f)); |
| else |
| rttinfo[0] = 0; |
| errno = ETIMEDOUT; |
| rd_kafka_broker_fail(rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, |
| "%i request(s) timed out: " |
| "disconnect%s", |
| rkb->rkb_req_timeouts, rttinfo); |
| } |
| } |
| } |
| |
| |
| |
| static ssize_t |
| rd_kafka_broker_send (rd_kafka_broker_t *rkb, rd_slice_t *slice) { |
| ssize_t r; |
| char errstr[128]; |
| |
| rd_kafka_assert(rkb->rkb_rk, rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP); |
| rd_kafka_assert(rkb->rkb_rk, rkb->rkb_transport); |
| |
| r = rd_kafka_transport_send(rkb->rkb_transport, slice, |
| errstr, sizeof(errstr)); |
| |
| if (r == -1) { |
| rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT, |
| "Send failed: %s", errstr); |
| rd_atomic64_add(&rkb->rkb_c.tx_err, 1); |
| return -1; |
| } |
| |
| rd_atomic64_add(&rkb->rkb_c.tx_bytes, r); |
| rd_atomic64_add(&rkb->rkb_c.tx, 1); |
| return r; |
| } |
| |
| |
| |
| |
| static int rd_kafka_broker_resolve (rd_kafka_broker_t *rkb) { |
| const char *errstr; |
| |
| if (rkb->rkb_rsal && |
| rkb->rkb_t_rsal_last + rkb->rkb_rk->rk_conf.broker_addr_ttl < |
| time(NULL)) { // FIXME: rd_clock() |
| /* Address list has expired. */ |
| rd_sockaddr_list_destroy(rkb->rkb_rsal); |
| rkb->rkb_rsal = NULL; |
| } |
| |
| if (!rkb->rkb_rsal) { |
| /* Resolve */ |
| |
| rkb->rkb_rsal = rd_getaddrinfo(rkb->rkb_nodename, |
| RD_KAFKA_PORT_STR, |
| AI_ADDRCONFIG, |
| rkb->rkb_rk->rk_conf. |
| broker_addr_family, |
| SOCK_STREAM, |
| IPPROTO_TCP, &errstr); |
| |
| if (!rkb->rkb_rsal) { |
| rd_kafka_broker_fail(rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__RESOLVE, |
| /* Avoid duplicate log messages */ |
| rkb->rkb_err.err == errno ? |
| NULL : |
| "Failed to resolve '%s': %s", |
| rkb->rkb_nodename, errstr); |
| return -1; |
| } |
| } |
| |
| return 0; |
| } |
| |
| |
| static void rd_kafka_broker_buf_enq0 (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf, int at_head) { |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| rkbuf->rkbuf_ts_enq = rd_clock(); |
| |
| /* Set timeout if not already set */ |
| if (!rkbuf->rkbuf_ts_timeout) |
| rkbuf->rkbuf_ts_timeout = rkbuf->rkbuf_ts_enq + |
| rkb->rkb_rk->rk_conf.socket_timeout_ms * 1000; |
| |
| if (unlikely(at_head)) { |
| /* Insert message at head of queue */ |
| rd_kafka_buf_t *prev, *after = NULL; |
| |
| /* Put us behind any flash messages and partially sent buffers. |
| * We need to check if buf corrid is set rather than |
| * rkbuf_of since SSL_write may return 0 and expect the |
| * exact same arguments the next call. */ |
| TAILQ_FOREACH(prev, &rkb->rkb_outbufs.rkbq_bufs, rkbuf_link) { |
| if (!(prev->rkbuf_flags & RD_KAFKA_OP_F_FLASH) && |
| prev->rkbuf_corrid == 0) |
| break; |
| after = prev; |
| } |
| |
| if (after) |
| TAILQ_INSERT_AFTER(&rkb->rkb_outbufs.rkbq_bufs, |
| after, rkbuf, rkbuf_link); |
| else |
| TAILQ_INSERT_HEAD(&rkb->rkb_outbufs.rkbq_bufs, |
| rkbuf, rkbuf_link); |
| } else { |
| /* Insert message at tail of queue */ |
| TAILQ_INSERT_TAIL(&rkb->rkb_outbufs.rkbq_bufs, |
| rkbuf, rkbuf_link); |
| } |
| |
| (void)rd_atomic32_add(&rkb->rkb_outbufs.rkbq_cnt, 1); |
| (void)rd_atomic32_add(&rkb->rkb_outbufs.rkbq_msg_cnt, |
| rd_atomic32_get(&rkbuf->rkbuf_msgq.rkmq_msg_cnt)); |
| } |
| |
| |
| /** |
| * Finalize a stuffed rkbuf for sending to broker. |
| */ |
| static void rd_kafka_buf_finalize (rd_kafka_t *rk, rd_kafka_buf_t *rkbuf) { |
| size_t totsize; |
| |
| /* Calculate total request buffer length. */ |
| totsize = rd_buf_len(&rkbuf->rkbuf_buf) - 4; |
| rd_assert(totsize <= (size_t)rk->rk_conf.max_msg_size); |
| |
| /* Set up a buffer reader for sending the buffer. */ |
| rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf); |
| |
| /** |
| * Update request header fields |
| */ |
| /* Total reuqest length */ |
| rd_kafka_buf_update_i32(rkbuf, 0, (int32_t)totsize); |
| |
| /* ApiVersion */ |
| rd_kafka_buf_update_i16(rkbuf, 4+2, rkbuf->rkbuf_reqhdr.ApiVersion); |
| } |
| |
| |
| void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| |
| |
| rkbuf->rkbuf_cb = resp_cb; |
| rkbuf->rkbuf_opaque = opaque; |
| |
| rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf); |
| |
| rd_kafka_broker_buf_enq0(rkb, rkbuf, |
| (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLASH)? |
| 1/*head*/: 0/*tail*/); |
| } |
| |
| |
| /** |
| * Enqueue buffer on broker's xmit queue, but fail buffer immediately |
| * if broker is not up. |
| * |
| * Locality: broker thread |
| */ |
| static int rd_kafka_broker_buf_enq2 (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf) { |
| if (unlikely(rkb->rkb_source == RD_KAFKA_INTERNAL)) { |
| /* Fail request immediately if this is the internal broker. */ |
| rd_kafka_buf_callback(rkb->rkb_rk, rkb, |
| RD_KAFKA_RESP_ERR__TRANSPORT, |
| NULL, rkbuf); |
| return -1; |
| } |
| |
| rd_kafka_broker_buf_enq0(rkb, rkbuf, |
| (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLASH)? |
| 1/*head*/: 0/*tail*/); |
| |
| return 0; |
| } |
| |
| |
| |
| /** |
| * Enqueue buffer for tranmission. |
| * Responses are enqueued on 'replyq' (RD_KAFKA_OP_RECV_BUF) |
| * |
| * Locality: any thread |
| */ |
| void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_replyq_t replyq, |
| rd_kafka_resp_cb_t *resp_cb, |
| void *opaque) { |
| |
| assert(rkbuf->rkbuf_rkb == rkb); |
| if (resp_cb) { |
| rkbuf->rkbuf_replyq = replyq; |
| rkbuf->rkbuf_cb = resp_cb; |
| rkbuf->rkbuf_opaque = opaque; |
| } else { |
| rd_dassert(!replyq.q); |
| } |
| |
| rd_kafka_buf_finalize(rkb->rkb_rk, rkbuf); |
| |
| |
| if (thrd_is_current(rkb->rkb_thread)) { |
| rd_kafka_broker_buf_enq2(rkb, rkbuf); |
| |
| } else { |
| rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_BUF); |
| rko->rko_u.xbuf.rkbuf = rkbuf; |
| rd_kafka_q_enq(rkb->rkb_ops, rko); |
| } |
| } |
| |
| |
| |
| |
| /** |
| * @returns the current broker state change version. |
| * Pass this value to fugure rd_kafka_brokers_wait_state_change() calls |
| * to avoid the race condition where a state-change happens between |
| * an initial call to some API that fails and the sub-sequent |
| * .._wait_state_change() call. |
| */ |
| int rd_kafka_brokers_get_state_version (rd_kafka_t *rk) { |
| int version; |
| mtx_lock(&rk->rk_broker_state_change_lock); |
| version = rk->rk_broker_state_change_version; |
| mtx_unlock(&rk->rk_broker_state_change_lock); |
| return version; |
| } |
| |
| /** |
| * @brief Wait at most \p timeout_ms for any state change for any broker. |
| * \p stored_version is the value previously returned by |
| * rd_kafka_brokers_get_state_version() prior to another API call |
| * that failed due to invalid state. |
| * |
| * Triggers: |
| * - broker state changes |
| * - broker transitioning from blocking to non-blocking |
| * - partition leader changes |
| * - group state changes |
| * |
| * @remark There is no guarantee that a state change actually took place. |
| * |
| * @returns 1 if a state change was signaled (maybe), else 0 (timeout) |
| * |
| * @locality any thread |
| */ |
| int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version, |
| int timeout_ms) { |
| int r; |
| mtx_lock(&rk->rk_broker_state_change_lock); |
| if (stored_version != rk->rk_broker_state_change_version) |
| r = 1; |
| else |
| r = cnd_timedwait_ms(&rk->rk_broker_state_change_cnd, |
| &rk->rk_broker_state_change_lock, |
| timeout_ms) == thrd_success; |
| mtx_unlock(&rk->rk_broker_state_change_lock); |
| return r; |
| } |
| |
| |
| /** |
| * @brief Broadcast broker state change to listeners, if any. |
| * |
| * @locality any thread |
| */ |
| void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk) { |
| rd_kafka_dbg(rk, GENERIC, "BROADCAST", |
| "Broadcasting state change"); |
| mtx_lock(&rk->rk_broker_state_change_lock); |
| rk->rk_broker_state_change_version++; |
| cnd_broadcast(&rk->rk_broker_state_change_cnd); |
| mtx_unlock(&rk->rk_broker_state_change_lock); |
| } |
| |
| |
| /** |
| * Returns a random broker (with refcnt increased) in state 'state'. |
| * Uses Reservoir sampling. |
| * |
| * 'filter' is an optional callback used to filter out undesired brokers. |
| * The filter function should return 1 to filter out a broker, or 0 to keep it |
| * in the list of eligible brokers to return. |
| * rd_kafka_broker_lock() is held during the filter callback. |
| * |
| * Locks: rd_kafka_rdlock(rk) MUST be held. |
| * Locality: any thread |
| */ |
| rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state, |
| int (*filter) (rd_kafka_broker_t *rkb, |
| void *opaque), |
| void *opaque) { |
| rd_kafka_broker_t *rkb, *good = NULL; |
| int cnt = 0; |
| |
| TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
| rd_kafka_broker_lock(rkb); |
| if ((int)rkb->rkb_state == state && |
| (!filter || !filter(rkb, opaque))) { |
| if (cnt < 1 || rd_jitter(0, cnt) < 1) { |
| if (good) |
| rd_kafka_broker_destroy(good); |
| rd_kafka_broker_keep(rkb); |
| good = rkb; |
| } |
| cnt += 1; |
| } |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| return good; |
| } |
| |
| |
| /** |
| * @brief Spend at most \p timeout_ms to acquire a usable (Up && non-blocking) |
| * broker. |
| * |
| * @returns A probably usable broker with increased refcount, or NULL on timeout |
| * @locks rd_kafka_*lock() if !do_lock |
| * @locality any |
| */ |
| rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, |
| int timeout_ms, |
| int do_lock) { |
| const rd_ts_t ts_end = rd_timeout_init(timeout_ms); |
| |
| while (1) { |
| rd_kafka_broker_t *rkb; |
| int remains; |
| int version = rd_kafka_brokers_get_state_version(rk); |
| |
| /* Try non-blocking (e.g., non-fetching) brokers first. */ |
| if (do_lock) |
| rd_kafka_rdlock(rk); |
| rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP, |
| rd_kafka_broker_filter_non_blocking, |
| NULL); |
| if (!rkb) |
| rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP, |
| NULL, NULL); |
| if (do_lock) |
| rd_kafka_rdunlock(rk); |
| |
| if (rkb) |
| return rkb; |
| |
| remains = rd_timeout_remains(ts_end); |
| if (rd_timeout_expired(remains)) |
| return NULL; |
| |
| rd_kafka_brokers_wait_state_change(rk, version, remains); |
| } |
| |
| return NULL; |
| } |
| |
| |
| |
| /** |
| * Returns a broker in state `state`, preferring the one with |
| * matching `broker_id`. |
| * Uses Reservoir sampling. |
| * |
| * Locks: rd_kafka_rdlock(rk) MUST be held. |
| * Locality: any thread |
| */ |
| rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, |
| int state) { |
| rd_kafka_broker_t *rkb, *good = NULL; |
| int cnt = 0; |
| |
| TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
| rd_kafka_broker_lock(rkb); |
| if ((int)rkb->rkb_state == state) { |
| if (broker_id != -1 && rkb->rkb_nodeid == broker_id) { |
| if (good) |
| rd_kafka_broker_destroy(good); |
| rd_kafka_broker_keep(rkb); |
| good = rkb; |
| rd_kafka_broker_unlock(rkb); |
| break; |
| } |
| if (cnt < 1 || rd_jitter(0, cnt) < 1) { |
| if (good) |
| rd_kafka_broker_destroy(good); |
| rd_kafka_broker_keep(rkb); |
| good = rkb; |
| } |
| cnt += 1; |
| } |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| return good; |
| } |
| |
| |
| |
| |
| |
| |
| /** |
| * Find a waitresp (rkbuf awaiting response) by the correlation id. |
| */ |
| static rd_kafka_buf_t *rd_kafka_waitresp_find (rd_kafka_broker_t *rkb, |
| int32_t corrid) { |
| rd_kafka_buf_t *rkbuf; |
| rd_ts_t now = rd_clock(); |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| TAILQ_FOREACH(rkbuf, &rkb->rkb_waitresps.rkbq_bufs, rkbuf_link) |
| if (rkbuf->rkbuf_corrid == corrid) { |
| /* Convert ts_sent to RTT */ |
| rkbuf->rkbuf_ts_sent = now - rkbuf->rkbuf_ts_sent; |
| rd_avg_add(&rkb->rkb_avg_rtt, rkbuf->rkbuf_ts_sent); |
| |
| if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING && |
| rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, |
| 1) == 1) |
| rd_kafka_brokers_broadcast_state_change( |
| rkb->rkb_rk); |
| |
| rd_kafka_bufq_deq(&rkb->rkb_waitresps, rkbuf); |
| return rkbuf; |
| } |
| return NULL; |
| } |
| |
| |
| |
| |
| /** |
| * Map a response message to a request. |
| */ |
| static int rd_kafka_req_response (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf) { |
| rd_kafka_buf_t *req; |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| |
| /* Find corresponding request message by correlation id */ |
| if (unlikely(!(req = |
| rd_kafka_waitresp_find(rkb, |
| rkbuf->rkbuf_reshdr.CorrId)))) { |
| /* unknown response. probably due to request timeout */ |
| rd_atomic64_add(&rkb->rkb_c.rx_corrid_err, 1); |
| rd_rkb_dbg(rkb, BROKER, "RESPONSE", |
| "Response for unknown CorrId %"PRId32" (timed out?)", |
| rkbuf->rkbuf_reshdr.CorrId); |
| rd_kafka_buf_destroy(rkbuf); |
| return -1; |
| } |
| |
| rd_rkb_dbg(rkb, PROTOCOL, "RECV", |
| "Received %sResponse (v%hd, %"PRIusz" bytes, CorrId %"PRId32 |
| ", rtt %.2fms)", |
| rd_kafka_ApiKey2str(req->rkbuf_reqhdr.ApiKey), |
| req->rkbuf_reqhdr.ApiVersion, |
| rkbuf->rkbuf_totlen, rkbuf->rkbuf_reshdr.CorrId, |
| (float)req->rkbuf_ts_sent / 1000.0f); |
| |
| /* Set up response reader slice starting past the response header */ |
| rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, |
| RD_KAFKAP_RESHDR_SIZE, |
| rd_buf_len(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE); |
| |
| if (!rkbuf->rkbuf_rkb) { |
| rkbuf->rkbuf_rkb = rkb; |
| rd_kafka_broker_keep(rkbuf->rkbuf_rkb); |
| } else |
| rd_assert(rkbuf->rkbuf_rkb == rkb); |
| |
| /* Call callback. */ |
| rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, rkbuf, req); |
| |
| return 0; |
| } |
| |
| |
| |
| |
| int rd_kafka_recv (rd_kafka_broker_t *rkb) { |
| rd_kafka_buf_t *rkbuf; |
| ssize_t r; |
| /* errstr is not set by buf_read errors, so default it here. */ |
| char errstr[512] = "Protocol parse failure"; |
| rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; |
| const int log_decode_errors = LOG_ERR; |
| |
| |
| /* It is impossible to estimate the correct size of the response |
| * so we split the read up in two parts: first we read the protocol |
| * length and correlation id (i.e., the Response header), and then |
| * when we know the full length of the response we allocate a new |
| * buffer and call receive again. |
| * All this in an async fashion (e.g., partial reads). |
| */ |
| if (!(rkbuf = rkb->rkb_recv_buf)) { |
| /* No receive in progress: create new buffer */ |
| |
| rkbuf = rd_kafka_buf_new(2, RD_KAFKAP_RESHDR_SIZE); |
| |
| rkb->rkb_recv_buf = rkbuf; |
| |
| /* Set up buffer reader for the response header. */ |
| rd_buf_write_ensure(&rkbuf->rkbuf_buf, |
| RD_KAFKAP_RESHDR_SIZE, |
| RD_KAFKAP_RESHDR_SIZE); |
| } |
| |
| rd_dassert(rd_buf_write_remains(&rkbuf->rkbuf_buf) > 0); |
| |
| r = rd_kafka_transport_recv(rkb->rkb_transport, &rkbuf->rkbuf_buf, |
| errstr, sizeof(errstr)); |
| if (unlikely(r <= 0)) { |
| if (r == 0) |
| return 0; /* EAGAIN */ |
| err = RD_KAFKA_RESP_ERR__TRANSPORT; |
| rd_atomic64_add(&rkb->rkb_c.rx_err, 1); |
| goto err; |
| } |
| |
| if (rkbuf->rkbuf_totlen == 0) { |
| /* Packet length not known yet. */ |
| |
| if (unlikely(rd_buf_write_pos(&rkbuf->rkbuf_buf) < |
| RD_KAFKAP_RESHDR_SIZE)) { |
| /* Need response header for packet length and corrid. |
| * Wait for more data. */ |
| return 0; |
| } |
| |
| rd_assert(!rkbuf->rkbuf_rkb); |
| rkbuf->rkbuf_rkb = rkb; /* Protocol parsing code needs |
| * the rkb for logging, but we dont |
| * want to keep a reference to the |
| * broker this early since that extra |
| * refcount will mess with the broker's |
| * refcount-based termination code. */ |
| |
| /* Initialize reader */ |
| rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, |
| RD_KAFKAP_RESHDR_SIZE); |
| |
| /* Read protocol header */ |
| rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.Size); |
| rd_kafka_buf_read_i32(rkbuf, &rkbuf->rkbuf_reshdr.CorrId); |
| |
| rkbuf->rkbuf_rkb = NULL; /* Reset */ |
| |
| rkbuf->rkbuf_totlen = rkbuf->rkbuf_reshdr.Size; |
| |
| /* Make sure message size is within tolerable limits. */ |
| if (rkbuf->rkbuf_totlen < 4/*CorrId*/ || |
| rkbuf->rkbuf_totlen > |
| (size_t)rkb->rkb_rk->rk_conf.recv_max_msg_size) { |
| rd_snprintf(errstr, sizeof(errstr), |
| "Invalid response size %"PRId32" (0..%i): " |
| "increase receive.message.max.bytes", |
| rkbuf->rkbuf_reshdr.Size, |
| rkb->rkb_rk->rk_conf.recv_max_msg_size); |
| err = RD_KAFKA_RESP_ERR__BAD_MSG; |
| rd_atomic64_add(&rkb->rkb_c.rx_err, 1); |
| goto err; |
| } |
| |
| rkbuf->rkbuf_totlen -= 4; /*CorrId*/ |
| |
| if (rkbuf->rkbuf_totlen > 0) { |
| /* Allocate another buffer that fits all data (short of |
| * the common response header). We want all |
| * data to be in contigious memory. */ |
| |
| rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, |
| rkbuf->rkbuf_totlen); |
| } |
| } |
| |
| if (rd_buf_write_pos(&rkbuf->rkbuf_buf) - RD_KAFKAP_RESHDR_SIZE == |
| rkbuf->rkbuf_totlen) { |
| /* Message is complete, pass it on to the original requester. */ |
| rkb->rkb_recv_buf = NULL; |
| rd_atomic64_add(&rkb->rkb_c.rx, 1); |
| rd_atomic64_add(&rkb->rkb_c.rx_bytes, |
| rd_buf_write_pos(&rkbuf->rkbuf_buf)); |
| rd_kafka_req_response(rkb, rkbuf); |
| } |
| |
| return 1; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| err: |
| rd_kafka_broker_fail(rkb, |
| !rkb->rkb_rk->rk_conf.log_connection_close && |
| !strcmp(errstr, "Disconnected") ? |
| LOG_DEBUG : LOG_ERR, err, |
| "Receive failed: %s", errstr); |
| return -1; |
| } |
| |
| |
| /** |
| * Linux version of socket_cb providing racefree CLOEXEC. |
| */ |
| int rd_kafka_socket_cb_linux (int domain, int type, int protocol, |
| void *opaque) { |
| #ifdef SOCK_CLOEXEC |
| return socket(domain, type | SOCK_CLOEXEC, protocol); |
| #else |
| return rd_kafka_socket_cb_generic(domain, type, protocol, opaque); |
| #endif |
| } |
| |
| /** |
| * Fallback version of socket_cb NOT providing racefree CLOEXEC, |
| * but setting CLOEXEC after socket creation (if FD_CLOEXEC is defined). |
| */ |
| int rd_kafka_socket_cb_generic (int domain, int type, int protocol, |
| void *opaque) { |
| int s; |
| int on = 1; |
| s = (int)socket(domain, type, protocol); |
| if (s == -1) |
| return -1; |
| #ifdef FD_CLOEXEC |
| fcntl(s, F_SETFD, FD_CLOEXEC, &on); |
| #endif |
| return s; |
| } |
| |
| |
| /** |
| * Initiate asynchronous connection attempt to the next address |
| * in the broker's address list. |
| * While the connect is asynchronous and its IO served in the CONNECT state, |
| * the initial name resolve is blocking. |
| * |
| * Returns -1 on error, else 0. |
| */ |
| static int rd_kafka_broker_connect (rd_kafka_broker_t *rkb) { |
| const rd_sockaddr_inx_t *sinx; |
| char errstr[512]; |
| |
| rd_rkb_dbg(rkb, BROKER, "CONNECT", |
| "broker in state %s connecting", |
| rd_kafka_broker_state_names[rkb->rkb_state]); |
| |
| if (rd_kafka_broker_resolve(rkb) == -1) |
| return -1; |
| |
| sinx = rd_sockaddr_list_next(rkb->rkb_rsal); |
| |
| rd_kafka_assert(rkb->rkb_rk, !rkb->rkb_transport); |
| |
| if (!(rkb->rkb_transport = rd_kafka_transport_connect(rkb, sinx, |
| errstr, sizeof(errstr)))) { |
| /* Avoid duplicate log messages */ |
| if (rkb->rkb_err.err == errno) |
| rd_kafka_broker_fail(rkb, LOG_DEBUG, |
| RD_KAFKA_RESP_ERR__FAIL, NULL); |
| else |
| rd_kafka_broker_fail(rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__TRANSPORT, |
| "%s", errstr); |
| return -1; |
| } |
| |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_CONNECT); |
| rd_kafka_broker_unlock(rkb); |
| |
| return 0; |
| } |
| |
| |
| /** |
| * @brief Call when connection is ready to transition to fully functional |
| * UP state. |
| * |
| * @locality Broker thread |
| */ |
| void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb) { |
| |
| rkb->rkb_max_inflight = rkb->rkb_rk->rk_conf.max_inflight; |
| rkb->rkb_err.err = 0; |
| |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP); |
| rd_kafka_broker_unlock(rkb); |
| |
| /* Request metadata (async): |
| * try locally known topics first and if there are none try |
| * getting just the broker list. */ |
| if (rd_kafka_metadata_refresh_known_topics(NULL, rkb, 0/*dont force*/, |
| "connected") == |
| RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) |
| rd_kafka_metadata_refresh_brokers(NULL, rkb, "connected"); |
| } |
| |
| |
| |
| static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb); |
| |
| |
| /** |
| * @brief Parses and handles SaslMechanism response, transitions |
| * the broker state. |
| * |
| */ |
| static void |
| rd_kafka_broker_handle_SaslHandshake (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| const int log_decode_errors = LOG_ERR; |
| int32_t MechCnt; |
| int16_t ErrorCode; |
| int i = 0; |
| char *mechs = "(n/a)"; |
| size_t msz, mof = 0; |
| |
| if (err == RD_KAFKA_RESP_ERR__DESTROY) |
| return; |
| |
| if (err) |
| goto err; |
| |
| rd_kafka_buf_read_i16(rkbuf, &ErrorCode); |
| rd_kafka_buf_read_i32(rkbuf, &MechCnt); |
| |
| /* Build a CSV string of supported mechanisms. */ |
| msz = RD_MIN(511, MechCnt * 32); |
| mechs = rd_alloca(msz); |
| *mechs = '\0'; |
| |
| for (i = 0 ; i < MechCnt ; i++) { |
| rd_kafkap_str_t mech; |
| rd_kafka_buf_read_str(rkbuf, &mech); |
| |
| mof += rd_snprintf(mechs+mof, msz-mof, "%s%.*s", |
| i ? ",":"", RD_KAFKAP_STR_PR(&mech)); |
| |
| if (mof >= msz) |
| break; |
| } |
| |
| rd_rkb_dbg(rkb, |
| PROTOCOL | RD_KAFKA_DBG_SECURITY | RD_KAFKA_DBG_BROKER, |
| "SASLMECHS", "Broker supported SASL mechanisms: %s", |
| mechs); |
| |
| if (ErrorCode) { |
| err = ErrorCode; |
| goto err; |
| } |
| |
| /* Circle back to connect_auth() to start proper AUTH state. */ |
| rd_kafka_broker_connect_auth(rkb); |
| return; |
| |
| err_parse: |
| err = rkbuf->rkbuf_err; |
| err: |
| rd_kafka_broker_fail(rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__AUTHENTICATION, |
| "SASL %s mechanism handshake failed: %s: " |
| "broker's supported mechanisms: %s", |
| rkb->rkb_rk->rk_conf.sasl.mechanisms, |
| rd_kafka_err2str(err), mechs); |
| } |
| |
| |
| /** |
| * @brief Transition state to: |
| * - AUTH_HANDSHAKE (if SASL is configured and handshakes supported) |
| * - AUTH (if SASL is configured but no handshake is required or |
| * not supported, or has already taken place.) |
| * - UP (if SASL is not configured) |
| */ |
| static void rd_kafka_broker_connect_auth (rd_kafka_broker_t *rkb) { |
| |
| if ((rkb->rkb_proto == RD_KAFKA_PROTO_SASL_PLAINTEXT || |
| rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL)) { |
| |
| rd_rkb_dbg(rkb, SECURITY | RD_KAFKA_DBG_BROKER, "AUTH", |
| "Auth in state %s (handshake %ssupported)", |
| rd_kafka_broker_state_names[rkb->rkb_state], |
| (rkb->rkb_features&RD_KAFKA_FEATURE_SASL_HANDSHAKE) |
| ? "" : "not "); |
| |
| /* Broker >= 0.10.0: send request to select mechanism */ |
| if (rkb->rkb_state != RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE && |
| (rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) { |
| |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state( |
| rkb, RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE); |
| rd_kafka_broker_unlock(rkb); |
| |
| rd_kafka_SaslHandshakeRequest( |
| rkb, rkb->rkb_rk->rk_conf.sasl.mechanisms, |
| RD_KAFKA_NO_REPLYQ, |
| rd_kafka_broker_handle_SaslHandshake, |
| NULL, 1 /* flash */); |
| |
| } else { |
| /* Either Handshake succeeded (protocol selected) |
| * or Handshakes were not supported. |
| * In both cases continue with authentication. */ |
| char sasl_errstr[512]; |
| |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb, |
| RD_KAFKA_BROKER_STATE_AUTH); |
| rd_kafka_broker_unlock(rkb); |
| |
| if (rd_kafka_sasl_client_new( |
| rkb->rkb_transport, sasl_errstr, |
| sizeof(sasl_errstr)) == -1) { |
| errno = EINVAL; |
| rd_kafka_broker_fail( |
| rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__AUTHENTICATION, |
| "Failed to initialize " |
| "SASL authentication: %s", |
| sasl_errstr); |
| return; |
| } |
| |
| /* Enter non-Kafka-protocol-framed SASL communication |
| * state handled in rdkafka_sasl.c */ |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb, |
| RD_KAFKA_BROKER_STATE_AUTH); |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| return; |
| } |
| |
| /* No authentication required. */ |
| rd_kafka_broker_connect_up(rkb); |
| } |
| |
| |
| /** |
| * @brief Specify API versions to use for this connection. |
| * |
| * @param apis is an allocated list of supported partitions. |
| * If NULL the default set will be used based on the |
| * \p broker.version.fallback property. |
| * @param api_cnt number of elements in \p apis |
| * |
| * @remark \p rkb takes ownership of \p apis. |
| * |
| * @locality Broker thread |
| * @locks none |
| */ |
| static void rd_kafka_broker_set_api_versions (rd_kafka_broker_t *rkb, |
| struct rd_kafka_ApiVersion *apis, |
| size_t api_cnt) { |
| |
| rd_kafka_broker_lock(rkb); |
| |
| if (rkb->rkb_ApiVersions) |
| rd_free(rkb->rkb_ApiVersions); |
| |
| |
| if (!apis) { |
| rd_rkb_dbg(rkb, PROTOCOL | RD_KAFKA_DBG_BROKER, "APIVERSION", |
| "Using (configuration fallback) %s protocol features", |
| rkb->rkb_rk->rk_conf.broker_version_fallback); |
| |
| |
| rd_kafka_get_legacy_ApiVersions(rkb->rkb_rk->rk_conf. |
| broker_version_fallback, |
| &apis, &api_cnt, |
| rkb->rkb_rk->rk_conf. |
| broker_version_fallback); |
| |
| /* Make a copy to store on broker. */ |
| rd_kafka_ApiVersions_copy(apis, api_cnt, &apis, &api_cnt); |
| } |
| |
| rkb->rkb_ApiVersions = apis; |
| rkb->rkb_ApiVersions_cnt = api_cnt; |
| |
| /* Update feature set based on supported broker APIs. */ |
| rd_kafka_broker_features_set(rkb, |
| rd_kafka_features_check(rkb, apis, api_cnt)); |
| |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| |
| /** |
| * Handler for ApiVersion response. |
| */ |
| static void |
| rd_kafka_broker_handle_ApiVersion (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *rkbuf, |
| rd_kafka_buf_t *request, void *opaque) { |
| struct rd_kafka_ApiVersion *apis; |
| size_t api_cnt; |
| |
| if (err == RD_KAFKA_RESP_ERR__DESTROY) |
| return; |
| |
| err = rd_kafka_handle_ApiVersion(rk, rkb, err, rkbuf, request, |
| &apis, &api_cnt); |
| |
| if (err) { |
| rd_kafka_broker_fail(rkb, LOG_DEBUG, |
| RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, |
| "ApiVersionRequest failed: %s: " |
| "probably due to old broker version", |
| rd_kafka_err2str(err)); |
| return; |
| } |
| |
| rd_kafka_broker_set_api_versions(rkb, apis, api_cnt); |
| |
| rd_kafka_broker_connect_auth(rkb); |
| } |
| |
| |
| /** |
| * Call when asynchronous connection attempt completes, either succesfully |
| * (if errstr is NULL) or fails. |
| * |
| * Locality: broker thread |
| */ |
| void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr) { |
| |
| if (errstr) { |
| /* Connect failed */ |
| rd_kafka_broker_fail(rkb, |
| errno != 0 && rkb->rkb_err.err == errno ? |
| LOG_DEBUG : LOG_ERR, |
| RD_KAFKA_RESP_ERR__TRANSPORT, |
| "%s", errstr); |
| return; |
| } |
| |
| /* Connect succeeded */ |
| rkb->rkb_connid++; |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL, |
| "CONNECTED", "Connected (#%d)", rkb->rkb_connid); |
| rkb->rkb_err.err = 0; |
| rkb->rkb_max_inflight = 1; /* Hold back other requests until |
| * ApiVersion, SaslHandshake, etc |
| * are done. */ |
| |
| rd_kafka_transport_poll_set(rkb->rkb_transport, POLLIN); |
| |
| if (rkb->rkb_rk->rk_conf.api_version_request && |
| rd_interval_immediate(&rkb->rkb_ApiVersion_fail_intvl, 0, 0) > 0) { |
| /* Use ApiVersion to query broker for supported API versions. */ |
| rd_kafka_broker_feature_enable(rkb, RD_KAFKA_FEATURE_APIVERSION); |
| } |
| |
| |
| if (rkb->rkb_features & RD_KAFKA_FEATURE_APIVERSION) { |
| /* Query broker for supported API versions. |
| * This may fail with a disconnect on non-supporting brokers |
| * so hold off any other requests until we get a response, |
| * and if the connection is torn down we disable this feature. */ |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb,RD_KAFKA_BROKER_STATE_APIVERSION_QUERY); |
| rd_kafka_broker_unlock(rkb); |
| |
| rd_kafka_ApiVersionRequest( |
| rkb, RD_KAFKA_NO_REPLYQ, |
| rd_kafka_broker_handle_ApiVersion, NULL, |
| 1 /*Flash message: prepend to transmit queue*/); |
| } else { |
| |
| /* Use configured broker.version.fallback to |
| * figure out API versions */ |
| rd_kafka_broker_set_api_versions(rkb, NULL, 0); |
| |
| /* Authenticate if necessary */ |
| rd_kafka_broker_connect_auth(rkb); |
| } |
| |
| } |
| |
| |
| |
| /** |
| * @brief Checks if the given API request+version is supported by the broker. |
| * @returns 1 if supported, else 0. |
| * @locality broker thread |
| * @locks none |
| */ |
| static RD_INLINE int |
| rd_kafka_broker_request_supported (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf) { |
| struct rd_kafka_ApiVersion skel = { |
| .ApiKey = rkbuf->rkbuf_reqhdr.ApiKey |
| }; |
| struct rd_kafka_ApiVersion *ret; |
| |
| if (unlikely(rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_ApiVersion)) |
| return 1; /* ApiVersion requests are used to detect |
| * the supported API versions, so should always |
| * be allowed through. */ |
| |
| /* First try feature flags, if any, which may cover a larger |
| * set of APIs. */ |
| if (rkbuf->rkbuf_features) |
| return (rkb->rkb_features & rkbuf->rkbuf_features) == |
| rkbuf->rkbuf_features; |
| |
| /* Then try the ApiVersion map. */ |
| ret = bsearch(&skel, rkb->rkb_ApiVersions, rkb->rkb_ApiVersions_cnt, |
| sizeof(*rkb->rkb_ApiVersions), |
| rd_kafka_ApiVersion_key_cmp); |
| if (!ret) |
| return 0; |
| |
| return ret->MinVer <= rkbuf->rkbuf_reqhdr.ApiVersion && |
| rkbuf->rkbuf_reqhdr.ApiVersion <= ret->MaxVer; |
| } |
| |
| |
| /** |
| * Send queued messages to broker |
| * |
| * Locality: io thread |
| */ |
| int rd_kafka_send (rd_kafka_broker_t *rkb) { |
| rd_kafka_buf_t *rkbuf; |
| unsigned int cnt = 0; |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP && |
| rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight && |
| (rkbuf = TAILQ_FIRST(&rkb->rkb_outbufs.rkbq_bufs))) { |
| ssize_t r; |
| size_t pre_of = rd_slice_offset(&rkbuf->rkbuf_reader); |
| |
| /* Check for broker support */ |
| if (unlikely(!rd_kafka_broker_request_supported(rkb, rkbuf))) { |
| rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf); |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_PROTOCOL, |
| "UNSUPPORTED", |
| "Failing %sResponse " |
| "(v%hd, %"PRIusz" bytes, CorrId %"PRId32"): " |
| "request not supported by broker " |
| "(missing api.version.request or " |
| "incorrect broker.version.fallback config?)", |
| rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. |
| ApiKey), |
| rkbuf->rkbuf_reqhdr.ApiVersion, |
| rkbuf->rkbuf_totlen, |
| rkbuf->rkbuf_reshdr.CorrId); |
| rd_kafka_buf_callback( |
| rkb->rkb_rk, rkb, |
| RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, |
| NULL, rkbuf); |
| continue; |
| } |
| |
| /* Set CorrId header field, unless this is the latter part |
| * of a partial send in which case the corrid has already |
| * been set. |
| * Due to how SSL_write() will accept a buffer but still |
| * return 0 in some cases we can't rely on the buffer offset |
| * but need to use corrid to check this. SSL_write() expects |
| * us to send the same buffer again when 0 is returned. |
| */ |
| if (rkbuf->rkbuf_corrid == 0 || |
| rkbuf->rkbuf_connid != rkb->rkb_connid) { |
| rd_assert(rd_slice_offset(&rkbuf->rkbuf_reader) == 0); |
| rkbuf->rkbuf_corrid = ++rkb->rkb_corrid; |
| rd_kafka_buf_update_i32(rkbuf, 4+2+2, |
| rkbuf->rkbuf_corrid); |
| rkbuf->rkbuf_connid = rkb->rkb_connid; |
| } else if (pre_of > RD_KAFKAP_REQHDR_SIZE) { |
| rd_kafka_assert(NULL, |
| rkbuf->rkbuf_connid == rkb->rkb_connid); |
| } |
| |
| if (0) { |
| rd_rkb_dbg(rkb, PROTOCOL, "SEND", |
| "Send %s corrid %"PRId32" at " |
| "offset %"PRIusz"/%"PRIusz, |
| rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. |
| ApiKey), |
| rkbuf->rkbuf_corrid, |
| pre_of, rd_slice_size(&rkbuf->rkbuf_reader)); |
| } |
| |
| if ((r = rd_kafka_broker_send(rkb, &rkbuf->rkbuf_reader)) == -1) |
| return -1; |
| |
| /* Partial send? Continue next time. */ |
| if (rd_slice_remains(&rkbuf->rkbuf_reader) > 0) { |
| rd_rkb_dbg(rkb, PROTOCOL, "SEND", |
| "Sent partial %sRequest " |
| "(v%hd, " |
| "%"PRIdsz"+%"PRIdsz"/%"PRIusz" bytes, " |
| "CorrId %"PRId32")", |
| rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr. |
| ApiKey), |
| rkbuf->rkbuf_reqhdr.ApiVersion, |
| (ssize_t)pre_of, r, |
| rd_slice_size(&rkbuf->rkbuf_reader), |
| rkbuf->rkbuf_corrid); |
| return 0; |
| } |
| |
| rd_rkb_dbg(rkb, PROTOCOL, "SEND", |
| "Sent %sRequest (v%hd, %"PRIusz" bytes @ %"PRIusz", " |
| "CorrId %"PRId32")", |
| rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey), |
| rkbuf->rkbuf_reqhdr.ApiVersion, |
| rd_slice_size(&rkbuf->rkbuf_reader), |
| pre_of, rkbuf->rkbuf_corrid); |
| |
| /* Entire buffer sent, unlink from outbuf */ |
| rd_kafka_bufq_deq(&rkb->rkb_outbufs, rkbuf); |
| |
| /* Store time for RTT calculation */ |
| rkbuf->rkbuf_ts_sent = rd_clock(); |
| |
| if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_BLOCKING && |
| rd_atomic32_add(&rkb->rkb_blocking_request_cnt, 1) == 1) |
| rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); |
| |
| /* Put buffer on response wait list unless we are not |
| * expecting a response (required_acks=0). */ |
| if (!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_NO_RESPONSE)) |
| rd_kafka_bufq_enq(&rkb->rkb_waitresps, rkbuf); |
| else { /* Call buffer callback for delivery report. */ |
| rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf); |
| } |
| |
| cnt++; |
| } |
| |
| return cnt; |
| } |
| |
| |
| /** |
| * Add 'rkbuf' to broker 'rkb's retry queue. |
| */ |
| void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) { |
| |
| /* Restore original replyq since replyq.q will have been NULLed |
| * by buf_callback()/replyq_enq(). */ |
| if (!rkbuf->rkbuf_replyq.q && rkbuf->rkbuf_orig_replyq.q) { |
| rkbuf->rkbuf_replyq = rkbuf->rkbuf_orig_replyq; |
| rd_kafka_replyq_clear(&rkbuf->rkbuf_orig_replyq); |
| } |
| |
| /* If called from another thread than rkb's broker thread |
| * enqueue the buffer on the broker's op queue. */ |
| if (!thrd_is_current(rkb->rkb_thread)) { |
| rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_XMIT_RETRY); |
| rko->rko_u.xbuf.rkbuf = rkbuf; |
| rd_kafka_q_enq(rkb->rkb_ops, rko); |
| return; |
| } |
| |
| rd_rkb_dbg(rkb, PROTOCOL, "RETRY", |
| "Retrying %sRequest (v%hd, %"PRIusz" bytes, retry %d/%d)", |
| rd_kafka_ApiKey2str(rkbuf->rkbuf_reqhdr.ApiKey), |
| rkbuf->rkbuf_reqhdr.ApiVersion, |
| rd_slice_size(&rkbuf->rkbuf_reader), |
| rkbuf->rkbuf_retries, rkb->rkb_rk->rk_conf.max_retries); |
| |
| rd_atomic64_add(&rkb->rkb_c.tx_retries, 1); |
| |
| rkbuf->rkbuf_ts_retry = rd_clock() + |
| (rkb->rkb_rk->rk_conf.retry_backoff_ms * 1000); |
| /* Reset send offset */ |
| rd_slice_seek(&rkbuf->rkbuf_reader, 0); |
| rkbuf->rkbuf_corrid = 0; |
| |
| rd_kafka_bufq_enq(&rkb->rkb_retrybufs, rkbuf); |
| } |
| |
| |
| /** |
| * Move buffers that have expired their retry backoff time from the |
| * retry queue to the outbuf. |
| */ |
| static void rd_kafka_broker_retry_bufs_move (rd_kafka_broker_t *rkb) { |
| rd_ts_t now = rd_clock(); |
| rd_kafka_buf_t *rkbuf; |
| |
| while ((rkbuf = TAILQ_FIRST(&rkb->rkb_retrybufs.rkbq_bufs))) { |
| if (rkbuf->rkbuf_ts_retry > now) |
| break; |
| |
| rd_kafka_bufq_deq(&rkb->rkb_retrybufs, rkbuf); |
| |
| rd_kafka_broker_buf_enq0(rkb, rkbuf, 0/*tail*/); |
| } |
| } |
| |
| |
| /** |
| * Propagate delivery report for entire message queue. |
| */ |
| void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt, |
| rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err) { |
| rd_kafka_t *rk = rkt->rkt_rk; |
| |
| if (unlikely(rd_kafka_msgq_len(rkmq) == 0)) |
| return; |
| |
| /* Call on_acknowledgement() interceptors */ |
| rd_kafka_interceptors_on_acknowledgement_queue(rk, rkmq); |
| |
| if ((rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) && |
| (!rk->rk_conf.dr_err_only || err)) { |
| /* Pass all messages to application thread in one op. */ |
| rd_kafka_op_t *rko; |
| |
| rko = rd_kafka_op_new(RD_KAFKA_OP_DR); |
| rko->rko_err = err; |
| rko->rko_u.dr.s_rkt = rd_kafka_topic_keep(rkt); |
| rd_kafka_msgq_init(&rko->rko_u.dr.msgq); |
| |
| /* Move all messages to op's msgq */ |
| rd_kafka_msgq_move(&rko->rko_u.dr.msgq, rkmq); |
| |
| rd_kafka_q_enq(rk->rk_rep, rko); |
| |
| } else { |
| /* No delivery report callback. */ |
| |
| /* Destroy the messages right away. */ |
| rd_kafka_msgq_purge(rk, rkmq); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| /** |
| * @brief Map and assign existing partitions to this broker using |
| * the leader-id. |
| * |
| * @locks none |
| * @locality any |
| */ |
| static void rd_kafka_broker_map_partitions (rd_kafka_broker_t *rkb) { |
| rd_kafka_t *rk = rkb->rkb_rk; |
| rd_kafka_itopic_t *rkt; |
| int cnt = 0; |
| |
| if (rkb->rkb_nodeid == -1) |
| return; |
| |
| rd_kafka_rdlock(rk); |
| TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { |
| int i; |
| |
| rd_kafka_topic_wrlock(rkt); |
| for (i = 0 ; i < rkt->rkt_partition_cnt ; i++) { |
| shptr_rd_kafka_toppar_t *s_rktp = rkt->rkt_p[i]; |
| rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp); |
| |
| /* Only map unassigned partitions matching this broker*/ |
| rd_kafka_toppar_lock(rktp); |
| if (rktp->rktp_leader_id == rkb->rkb_nodeid && |
| !(rktp->rktp_leader && rktp->rktp_next_leader)) { |
| rd_kafka_toppar_leader_update( |
| rktp, rktp->rktp_leader_id, rkb); |
| cnt++; |
| } |
| rd_kafka_toppar_unlock(rktp); |
| } |
| rd_kafka_topic_wrunlock(rkt); |
| } |
| rd_kafka_rdunlock(rk); |
| |
| rd_rkb_dbg(rkb, TOPIC|RD_KAFKA_DBG_BROKER, "LEADER", |
| "Mapped %d partition(s) to broker", cnt); |
| } |
| |
| |
| /** |
| * @brief Broker id comparator |
| */ |
| static int rd_kafka_broker_cmp_by_id (const void *_a, const void *_b) { |
| const rd_kafka_broker_t *a = _a, *b = _b; |
| return a->rkb_nodeid - b->rkb_nodeid; |
| } |
| |
| |
| |
| /** |
| * @brief Serve a broker op (an op posted by another thread to be handled by |
| * this broker's thread). |
| * |
| * @returns 0 if calling op loop should break out, else 1 to continue. |
| * @locality broker thread |
| * @locks none |
| */ |
| static int rd_kafka_broker_op_serve (rd_kafka_broker_t *rkb, |
| rd_kafka_op_t *rko) { |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| int ret = 1; |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| switch (rko->rko_type) |
| { |
| case RD_KAFKA_OP_NODE_UPDATE: |
| { |
| enum { |
| _UPD_NAME = 0x1, |
| _UPD_ID = 0x2 |
| } updated = 0; |
| char brokername[RD_KAFKA_NODENAME_SIZE]; |
| |
| /* Need kafka_wrlock for updating rk_broker_by_id */ |
| rd_kafka_wrlock(rkb->rkb_rk); |
| rd_kafka_broker_lock(rkb); |
| |
| if (strcmp(rkb->rkb_nodename, |
| rko->rko_u.node.nodename)) { |
| rd_rkb_dbg(rkb, BROKER, "UPDATE", |
| "Nodename changed from %s to %s", |
| rkb->rkb_nodename, |
| rko->rko_u.node.nodename); |
| strncpy(rkb->rkb_nodename, |
| rko->rko_u.node.nodename, |
| sizeof(rkb->rkb_nodename)-1); |
| updated |= _UPD_NAME; |
| } |
| |
| if (rko->rko_u.node.nodeid != -1 && |
| rko->rko_u.node.nodeid != rkb->rkb_nodeid) { |
| int32_t old_nodeid = rkb->rkb_nodeid; |
| rd_rkb_dbg(rkb, BROKER, "UPDATE", |
| "NodeId changed from %"PRId32" to %"PRId32, |
| rkb->rkb_nodeid, |
| rko->rko_u.node.nodeid); |
| |
| rkb->rkb_nodeid = rko->rko_u.node.nodeid; |
| |
| /* Update broker_by_id sorted list */ |
| if (old_nodeid == -1) |
| rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb); |
| rd_list_sort(&rkb->rkb_rk->rk_broker_by_id, |
| rd_kafka_broker_cmp_by_id); |
| |
| updated |= _UPD_ID; |
| } |
| |
| rd_kafka_mk_brokername(brokername, sizeof(brokername), |
| rkb->rkb_proto, |
| rkb->rkb_nodename, rkb->rkb_nodeid, |
| RD_KAFKA_LEARNED); |
| if (strcmp(rkb->rkb_name, brokername)) { |
| /* Udate the name copy used for logging. */ |
| mtx_lock(&rkb->rkb_logname_lock); |
| rd_free(rkb->rkb_logname); |
| rkb->rkb_logname = rd_strdup(brokername); |
| mtx_unlock(&rkb->rkb_logname_lock); |
| |
| rd_rkb_dbg(rkb, BROKER, "UPDATE", |
| "Name changed from %s to %s", |
| rkb->rkb_name, brokername); |
| strncpy(rkb->rkb_name, brokername, |
| sizeof(rkb->rkb_name)-1); |
| } |
| rd_kafka_broker_unlock(rkb); |
| rd_kafka_wrunlock(rkb->rkb_rk); |
| |
| if (updated & _UPD_NAME) |
| rd_kafka_broker_fail(rkb, LOG_NOTICE, |
| RD_KAFKA_RESP_ERR__NODE_UPDATE, |
| "Broker hostname updated"); |
| else if (updated & _UPD_ID) { |
| /* Map existing partitions to this broker. */ |
| rd_kafka_broker_map_partitions(rkb); |
| |
| /* If broker is currently in state up we need |
| * to trigger a state change so it exits its |
| * state&type based .._serve() loop. */ |
| rd_kafka_broker_lock(rkb); |
| if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) |
| rd_kafka_broker_set_state( |
| rkb, RD_KAFKA_BROKER_STATE_UPDATE); |
| rd_kafka_broker_unlock(rkb); |
| } |
| break; |
| } |
| |
| case RD_KAFKA_OP_XMIT_BUF: |
| rd_kafka_broker_buf_enq2(rkb, rko->rko_u.xbuf.rkbuf); |
| rko->rko_u.xbuf.rkbuf = NULL; /* buffer now owned by broker */ |
| if (rko->rko_replyq.q) { |
| /* Op will be reused for forwarding response. */ |
| rko = NULL; |
| } |
| break; |
| |
| case RD_KAFKA_OP_XMIT_RETRY: |
| rd_kafka_broker_buf_retry(rkb, rko->rko_u.xbuf.rkbuf); |
| rko->rko_u.xbuf.rkbuf = NULL; |
| break; |
| |
| case RD_KAFKA_OP_PARTITION_JOIN: |
| /* |
| * Add partition to broker toppars |
| */ |
| rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
| rd_kafka_toppar_lock(rktp); |
| |
| /* Abort join if instance is terminating */ |
| if (rd_kafka_terminating(rkb->rkb_rk) || |
| (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE)) { |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK", |
| "Topic %s [%"PRId32"]: not joining broker: " |
| "%s", |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition, |
| rd_kafka_terminating(rkb->rkb_rk) ? |
| "instance is terminating" : |
| "partition removed"); |
| |
| rd_kafka_broker_destroy(rktp->rktp_next_leader); |
| rktp->rktp_next_leader = NULL; |
| rd_kafka_toppar_unlock(rktp); |
| break; |
| } |
| |
| /* See if we are still the next leader */ |
| if (rktp->rktp_next_leader != rkb) { |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK", |
| "Topic %s [%"PRId32"]: not joining broker " |
| "(next leader %s)", |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition, |
| rktp->rktp_next_leader ? |
| rd_kafka_broker_name(rktp->rktp_next_leader): |
| "(none)"); |
| |
| /* Need temporary refcount so we can safely unlock |
| * after q_enq(). */ |
| s_rktp = rd_kafka_toppar_keep(rktp); |
| |
| /* No, forward this op to the new next leader. */ |
| rd_kafka_q_enq(rktp->rktp_next_leader->rkb_ops, rko); |
| rko = NULL; |
| |
| rd_kafka_toppar_unlock(rktp); |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| break; |
| } |
| |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK", |
| "Topic %s [%"PRId32"]: joining broker (rktp %p)", |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition, rktp); |
| |
| rd_kafka_assert(NULL, rktp->rktp_s_for_rkb == NULL); |
| rktp->rktp_s_for_rkb = rd_kafka_toppar_keep(rktp); |
| rd_kafka_broker_lock(rkb); |
| TAILQ_INSERT_TAIL(&rkb->rkb_toppars, rktp, rktp_rkblink); |
| rkb->rkb_toppar_cnt++; |
| rd_kafka_broker_unlock(rkb); |
| rktp->rktp_leader = rkb; |
| rktp->rktp_msgq_wakeup_fd = rkb->rkb_toppar_wakeup_fd; |
| rd_kafka_broker_keep(rkb); |
| |
| rd_kafka_broker_destroy(rktp->rktp_next_leader); |
| rktp->rktp_next_leader = NULL; |
| |
| rd_kafka_toppar_unlock(rktp); |
| |
| rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); |
| break; |
| |
| case RD_KAFKA_OP_PARTITION_LEAVE: |
| /* |
| * Remove partition from broker toppars |
| */ |
| rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
| |
| rd_kafka_toppar_lock(rktp); |
| |
| /* Multiple PARTITION_LEAVEs are possible during partition |
| * migration, make sure we're supposed to handle this one. */ |
| if (unlikely(rktp->rktp_leader != rkb)) { |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK", |
| "Topic %s [%"PRId32"]: " |
| "ignoring PARTITION_LEAVE: " |
| "broker is not leader (%s)", |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition, |
| rktp->rktp_leader ? |
| rd_kafka_broker_name(rktp->rktp_leader) : |
| "none"); |
| rd_kafka_toppar_unlock(rktp); |
| break; |
| } |
| rd_kafka_toppar_unlock(rktp); |
| |
| /* Remove from fetcher list */ |
| rd_kafka_toppar_fetch_decide(rktp, rkb, 1/*force remove*/); |
| |
| rd_kafka_toppar_lock(rktp); |
| |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK", |
| "Topic %s [%"PRId32"]: leaving broker " |
| "(%d messages in xmitq, next leader %s, rktp %p)", |
| rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
| rd_kafka_msgq_len(&rktp->rktp_xmit_msgq), |
| rktp->rktp_next_leader ? |
| rd_kafka_broker_name(rktp->rktp_next_leader) : |
| "(none)", rktp); |
| |
| /* Prepend xmitq(broker-local) messages to the msgq(global). |
| * There is no msgq_prepend() so we append msgq to xmitq |
| * and then move the queue altogether back over to msgq. */ |
| rd_kafka_msgq_concat(&rktp->rktp_xmit_msgq, |
| &rktp->rktp_msgq); |
| rd_kafka_msgq_move(&rktp->rktp_msgq, &rktp->rktp_xmit_msgq); |
| |
| rd_kafka_broker_lock(rkb); |
| TAILQ_REMOVE(&rkb->rkb_toppars, rktp, rktp_rkblink); |
| rkb->rkb_toppar_cnt--; |
| rd_kafka_broker_unlock(rkb); |
| rd_kafka_broker_destroy(rktp->rktp_leader); |
| rktp->rktp_msgq_wakeup_fd = -1; |
| rktp->rktp_leader = NULL; |
| |
| /* Need to hold on to a refcount past q_enq() and |
| * unlock() below */ |
| s_rktp = rktp->rktp_s_for_rkb; |
| rktp->rktp_s_for_rkb = NULL; |
| |
| if (rktp->rktp_next_leader) { |
| /* There is a next leader we need to migrate to. */ |
| rko->rko_type = RD_KAFKA_OP_PARTITION_JOIN; |
| rd_kafka_q_enq(rktp->rktp_next_leader->rkb_ops, rko); |
| rko = NULL; |
| } else { |
| rd_rkb_dbg(rkb, BROKER | RD_KAFKA_DBG_TOPIC, "TOPBRK", |
| "Topic %s [%"PRId32"]: no next leader, " |
| "failing %d message(s) in partition queue", |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition, |
| rd_kafka_msgq_len(&rktp->rktp_msgq)); |
| rd_kafka_assert(NULL, rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0); |
| rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq, |
| rd_kafka_terminating(rkb->rkb_rk) ? |
| RD_KAFKA_RESP_ERR__DESTROY : |
| RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION); |
| |
| } |
| |
| rd_kafka_toppar_unlock(rktp); |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); |
| break; |
| |
| case RD_KAFKA_OP_TERMINATE: |
| /* nop: just a wake-up. */ |
| if (rkb->rkb_blocking_max_ms > 1) |
| rkb->rkb_blocking_max_ms = 1; /* Speed up termination*/ |
| rd_rkb_dbg(rkb, BROKER, "TERM", |
| "Received TERMINATE op in state %s: " |
| "%d refcnts, %d toppar(s), %d fetch toppar(s), " |
| "%d outbufs, %d waitresps, %d retrybufs", |
| rd_kafka_broker_state_names[rkb->rkb_state], |
| rd_refcnt_get(&rkb->rkb_refcnt), |
| rkb->rkb_toppar_cnt, rkb->rkb_fetch_toppar_cnt, |
| (int)rd_kafka_bufq_cnt(&rkb->rkb_outbufs), |
| (int)rd_kafka_bufq_cnt(&rkb->rkb_waitresps), |
| (int)rd_kafka_bufq_cnt(&rkb->rkb_retrybufs)); |
| ret = 0; |
| break; |
| |
| default: |
| rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type"); |
| break; |
| } |
| |
| if (rko) |
| rd_kafka_op_destroy(rko); |
| |
| return ret; |
| } |
| |
| |
| /** |
| * @brief Serve broker ops and IOs. |
| * |
| * @param abs_timeout Maximum block time (absolute time). |
| * |
| * @locality broker thread |
| * @locks none |
| */ |
| static void rd_kafka_broker_serve (rd_kafka_broker_t *rkb, |
| rd_ts_t abs_timeout) { |
| rd_kafka_op_t *rko; |
| rd_ts_t now; |
| int initial_state = rkb->rkb_state; |
| int remains_ms = rd_timeout_remains(abs_timeout); |
| |
| /* Serve broker ops */ |
| while ((rko = rd_kafka_q_pop(rkb->rkb_ops, |
| !rkb->rkb_transport ? |
| remains_ms : RD_POLL_NOWAIT, |
| 0)) |
| && rd_kafka_broker_op_serve(rkb, rko)) |
| remains_ms = RD_POLL_NOWAIT; |
| |
| /* If the broker state changed in op_serve() we minimize |
| * the IO timeout since our caller might want to exit out of |
| * its loop on state change. */ |
| if (likely(rkb->rkb_transport != NULL)) { |
| int blocking_max_ms; |
| |
| if ((int)rkb->rkb_state != initial_state) |
| blocking_max_ms = 0; |
| else { |
| int remains_ms = rd_timeout_remains(abs_timeout); |
| if (remains_ms == RD_POLL_INFINITE || |
| remains_ms > rkb->rkb_blocking_max_ms) |
| remains_ms = rkb->rkb_blocking_max_ms; |
| blocking_max_ms = remains_ms; |
| } |
| |
| /* Serve IO events */ |
| rd_kafka_transport_io_serve(rkb->rkb_transport, |
| blocking_max_ms); |
| } |
| |
| /* Scan wait-response queue for timeouts. */ |
| now = rd_clock(); |
| if (rd_interval(&rkb->rkb_timeout_scan_intvl, 1000000, now) > 0) |
| rd_kafka_broker_timeout_scan(rkb, now); |
| } |
| |
| |
| /** |
| * @brief Serve the toppar's assigned to this broker. |
| * |
| * @returns the minimum Fetch backoff time (abs timestamp) for the |
| * partitions to fetch. |
| * |
| * @locality broker thread |
| */ |
| static rd_ts_t rd_kafka_broker_toppars_serve (rd_kafka_broker_t *rkb) { |
| rd_kafka_toppar_t *rktp, *rktp_tmp; |
| rd_ts_t min_backoff = RD_TS_MAX; |
| |
| TAILQ_FOREACH_SAFE(rktp, &rkb->rkb_toppars, rktp_rkblink, rktp_tmp) { |
| rd_ts_t backoff; |
| |
| /* Serve toppar to update desired rktp state */ |
| backoff = rd_kafka_broker_consumer_toppar_serve(rkb, rktp); |
| if (backoff < min_backoff) |
| min_backoff = backoff; |
| } |
| |
| return min_backoff; |
| } |
| |
| |
| /** |
| * Idle function for unassigned brokers |
| * If \p timeout_ms is not RD_POLL_INFINITE the serve loop will be exited |
| * regardless of state after this long (approximately). |
| */ |
| static void rd_kafka_broker_ua_idle (rd_kafka_broker_t *rkb, int timeout_ms) { |
| int initial_state = rkb->rkb_state; |
| rd_ts_t abs_timeout; |
| |
| if (rd_kafka_terminating(rkb->rkb_rk)) |
| timeout_ms = 1; |
| else if (timeout_ms == RD_POLL_INFINITE) |
| timeout_ms = rkb->rkb_blocking_max_ms; |
| |
| abs_timeout = rd_timeout_init(timeout_ms); |
| |
| /* Since ua_idle is used during connection setup |
| * in state ..BROKER_STATE_CONNECT we only run this loop |
| * as long as the state remains the same as the initial, on a state |
| * change - most likely to UP, a correct serve() function |
| * should be used instead. */ |
| while (!rd_kafka_broker_terminating(rkb) && |
| (int)rkb->rkb_state == initial_state && |
| !rd_timeout_expired(rd_timeout_remains(abs_timeout))) { |
| |
| rd_kafka_broker_toppars_serve(rkb); |
| rd_kafka_broker_serve(rkb, abs_timeout); |
| } |
| } |
| |
| |
| /** |
| * @brief Serve a toppar for producing. |
| * |
| * @param next_wakeup will be updated to when the next wake-up/attempt is |
| * desired, only lower (sooner) values will be set. |
| * |
| * Locks: toppar_lock(rktp) MUST be held. |
| * Returns the number of messages produced. |
| */ |
| static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb, |
| rd_kafka_toppar_t *rktp, |
| int do_timeout_scan, |
| rd_ts_t now, |
| rd_ts_t *next_wakeup) { |
| int cnt = 0; |
| int r; |
| |
| rd_rkb_dbg(rkb, QUEUE, "TOPPAR", |
| "%.*s [%"PRId32"] %i+%i msgs", |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt-> |
| rkt_topic), |
| rktp->rktp_partition, |
| rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt), |
| rd_atomic32_get(&rktp->rktp_xmit_msgq. |
| rkmq_msg_cnt)); |
| |
| if (rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt) > 0) |
| rd_kafka_msgq_concat(&rktp->rktp_xmit_msgq, &rktp->rktp_msgq); |
| |
| /* Timeout scan */ |
| if (unlikely(do_timeout_scan)) { |
| rd_kafka_msgq_t timedout = RD_KAFKA_MSGQ_INITIALIZER(timedout); |
| |
| if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq, |
| &timedout, now)) { |
| /* Trigger delivery report for timed out messages */ |
| rd_kafka_dr_msgq(rktp->rktp_rkt, &timedout, |
| RD_KAFKA_RESP_ERR__MSG_TIMED_OUT); |
| } |
| } |
| |
| r = rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt); |
| if (r == 0) |
| return 0; |
| |
| /* Attempt to fill the batch size, but limit |
| * our waiting to queue.buffering.max.ms |
| * and batch.num.messages. */ |
| if (r < rkb->rkb_rk->rk_conf.batch_num_messages) { |
| rd_kafka_msg_t *rkm_oldest; |
| rd_ts_t wait_max; |
| |
| rkm_oldest = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs); |
| if (unlikely(!rkm_oldest)) |
| return 0; |
| |
| /* Calculate maximum wait-time to |
| * honour queue.buffering.max.ms contract. */ |
| wait_max = rd_kafka_msg_enq_time(rkm_oldest) + |
| (rkb->rkb_rk->rk_conf.buffering_max_ms * 1000); |
| if (wait_max > now) { |
| if (wait_max < *next_wakeup) |
| *next_wakeup = wait_max; |
| /* Wait for more messages or queue.buffering.max.ms |
| * to expire. */ |
| return 0; |
| } |
| } |
| |
| /* Send Produce requests for this toppar */ |
| while (1) { |
| r = rd_kafka_ProduceRequest(rkb, rktp); |
| if (likely(r > 0)) |
| cnt += r; |
| else |
| break; |
| } |
| |
| return cnt; |
| } |
| |
| |
| /** |
| * Producer serving |
| */ |
| static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) { |
| rd_interval_t timeout_scan; |
| |
| rd_interval_init(&timeout_scan); |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| rd_kafka_broker_lock(rkb); |
| |
| while (!rd_kafka_broker_terminating(rkb) && |
| rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) { |
| rd_kafka_toppar_t *rktp; |
| int cnt; |
| rd_ts_t now; |
| rd_ts_t next_wakeup; |
| int do_timeout_scan = 0; |
| |
| rd_kafka_broker_unlock(rkb); |
| |
| now = rd_clock(); |
| next_wakeup = now + (rkb->rkb_rk->rk_conf. |
| socket_blocking_max_ms * 1000); |
| |
| if (rd_interval(&timeout_scan, 1000*1000, now) >= 0) |
| do_timeout_scan = 1; |
| |
| do { |
| cnt = 0; |
| |
| /* Serve each toppar */ |
| TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) { |
| /* Serve toppar op queue */ |
| rd_kafka_toppar_lock(rktp); |
| if (unlikely(rktp->rktp_leader != rkb)) { |
| /* Currently migrating away from this |
| * broker. */ |
| rd_kafka_toppar_unlock(rktp); |
| continue; |
| } |
| if (unlikely(RD_KAFKA_TOPPAR_IS_PAUSED(rktp))) { |
| /* Partition is paused */ |
| rd_kafka_toppar_unlock(rktp); |
| continue; |
| } |
| /* Try producing toppar */ |
| cnt += rd_kafka_toppar_producer_serve( |
| rkb, rktp, do_timeout_scan, now, |
| &next_wakeup); |
| |
| rd_kafka_toppar_unlock(rktp); |
| } |
| |
| } while (cnt); |
| |
| /* Check and move retry buffers */ |
| if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0)) |
| rd_kafka_broker_retry_bufs_move(rkb); |
| |
| rkb->rkb_blocking_max_ms = (int) |
| (next_wakeup > now ? (next_wakeup - now) / 1000 : 0); |
| rd_kafka_broker_serve(rkb, next_wakeup); |
| |
| rd_kafka_broker_lock(rkb); |
| } |
| |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| |
| |
| |
| |
| |
| |
| /** |
| * Backoff the next Fetch request (due to error). |
| */ |
| static void rd_kafka_broker_fetch_backoff (rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err) { |
| int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms; |
| rkb->rkb_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000); |
| rd_rkb_dbg(rkb, FETCH, "BACKOFF", |
| "Fetch backoff for %dms: %s", |
| backoff_ms, rd_kafka_err2str(err)); |
| } |
| |
| /** |
| * @brief Backoff the next Fetch for specific partition |
| */ |
| static void rd_kafka_toppar_fetch_backoff (rd_kafka_broker_t *rkb, |
| rd_kafka_toppar_t *rktp, |
| rd_kafka_resp_err_t err) { |
| int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms; |
| rktp->rktp_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000); |
| rd_rkb_dbg(rkb, FETCH, "BACKOFF", |
| "%s [%"PRId32"]: Fetch backoff for %dms: %s", |
| rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, |
| backoff_ms, rd_kafka_err2str(err)); |
| } |
| |
| |
| /** |
| * Parses and handles a Fetch reply. |
| * Returns 0 on success or an error code on failure. |
| */ |
| static rd_kafka_resp_err_t |
| rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request) { |
| int32_t TopicArrayCnt; |
| int i; |
| const int log_decode_errors = LOG_ERR; |
| shptr_rd_kafka_itopic_t *s_rkt = NULL; |
| |
| if (rd_kafka_buf_ApiVersion(request) >= 1) { |
| int32_t Throttle_Time; |
| rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); |
| |
| rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, |
| Throttle_Time); |
| } |
| |
| rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); |
| /* Verify that TopicArrayCnt seems to be in line with remaining size */ |
| rd_kafka_buf_check_len(rkbuf, |
| TopicArrayCnt * (3/*topic min size*/ + |
| 4/*PartitionArrayCnt*/ + |
| 4+2+8+4/*inner header*/)); |
| |
| for (i = 0 ; i < TopicArrayCnt ; i++) { |
| rd_kafkap_str_t topic; |
| int32_t fetch_version; |
| int32_t PartitionArrayCnt; |
| int j; |
| |
| rd_kafka_buf_read_str(rkbuf, &topic); |
| rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); |
| |
| s_rkt = rd_kafka_topic_find0(rkb->rkb_rk, &topic); |
| |
| for (j = 0 ; j < PartitionArrayCnt ; j++) { |
| struct rd_kafka_toppar_ver *tver, tver_skel; |
| rd_kafka_toppar_t *rktp; |
| shptr_rd_kafka_toppar_t *s_rktp = NULL; |
| rd_slice_t save_slice; |
| struct { |
| int32_t Partition; |
| int16_t ErrorCode; |
| int64_t HighwaterMarkOffset; |
| int64_t LastStableOffset; /* v4 */ |
| int32_t MessageSetSize; |
| } hdr; |
| rd_kafka_resp_err_t err; |
| |
| rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); |
| rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); |
| rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset); |
| |
| if (rd_kafka_buf_ApiVersion(request) == 4) { |
| int32_t AbortedTxCnt; |
| rd_kafka_buf_read_i64(rkbuf, |
| &hdr.LastStableOffset); |
| rd_kafka_buf_read_i32(rkbuf, &AbortedTxCnt); |
| /* Ignore aborted transactions for now */ |
| if (AbortedTxCnt > 0) |
| rd_kafka_buf_skip(rkbuf, |
| AbortedTxCnt * (8+8)); |
| } else |
| hdr.LastStableOffset = -1; |
| |
| rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSetSize); |
| |
| if (unlikely(hdr.MessageSetSize < 0)) |
| rd_kafka_buf_parse_fail( |
| rkbuf, |
| "%.*s [%"PRId32"]: " |
| "invalid MessageSetSize %"PRId32, |
| RD_KAFKAP_STR_PR(&topic), |
| hdr.Partition, |
| hdr.MessageSetSize); |
| |
| /* Look up topic+partition */ |
| if (likely(s_rkt != NULL)) { |
| rd_kafka_itopic_t *rkt; |
| rkt = rd_kafka_topic_s2i(s_rkt); |
| rd_kafka_topic_rdlock(rkt); |
| s_rktp = rd_kafka_toppar_get( |
| rkt, hdr.Partition, 0/*no ua-on-miss*/); |
| rd_kafka_topic_rdunlock(rkt); |
| } |
| |
| if (unlikely(!s_rkt || !s_rktp)) { |
| rd_rkb_dbg(rkb, TOPIC, "UNKTOPIC", |
| "Received Fetch response " |
| "(error %hu) for unknown topic " |
| "%.*s [%"PRId32"]: ignoring", |
| hdr.ErrorCode, |
| RD_KAFKAP_STR_PR(&topic), |
| hdr.Partition); |
| rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); |
| continue; |
| } |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| |
| rd_kafka_toppar_lock(rktp); |
| /* Make sure toppar hasn't moved to another broker |
| * during the lifetime of the request. */ |
| if (unlikely(rktp->rktp_leader != rkb)) { |
| rd_kafka_toppar_unlock(rktp); |
| rd_rkb_dbg(rkb, MSG, "FETCH", |
| "%.*s [%"PRId32"]: " |
| "partition leadership changed: " |
| "discarding fetch response", |
| RD_KAFKAP_STR_PR(&topic), |
| hdr.Partition); |
| rd_kafka_toppar_destroy(s_rktp); /* from get */ |
| rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); |
| continue; |
| } |
| fetch_version = rktp->rktp_fetch_version; |
| rd_kafka_toppar_unlock(rktp); |
| |
| /* Check if this Fetch is for an outdated fetch version, |
| * if so ignore it. */ |
| tver_skel.s_rktp = s_rktp; |
| tver = rd_list_find(request->rkbuf_rktp_vers, |
| &tver_skel, |
| rd_kafka_toppar_ver_cmp); |
| rd_kafka_assert(NULL, tver && |
| rd_kafka_toppar_s2i(tver->s_rktp) == |
| rktp); |
| if (tver->version < fetch_version) { |
| rd_rkb_dbg(rkb, MSG, "DROP", |
| "%s [%"PRId32"]: " |
| "dropping outdated fetch response " |
| "(v%d < %d)", |
| rktp->rktp_rkt->rkt_topic->str, |
| rktp->rktp_partition, |
| tver->version, fetch_version); |
| rd_atomic64_add(&rktp->rktp_c. rx_ver_drops, 1); |
| rd_kafka_toppar_destroy(s_rktp); /* from get */ |
| rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); |
| continue; |
| } |
| |
| rd_rkb_dbg(rkb, MSG, "FETCH", |
| "Topic %.*s [%"PRId32"] MessageSet " |
| "size %"PRId32", error \"%s\", " |
| "MaxOffset %"PRId64", " |
| "Ver %"PRId32"/%"PRId32, |
| RD_KAFKAP_STR_PR(&topic), hdr.Partition, |
| hdr.MessageSetSize, |
| rd_kafka_err2str(hdr.ErrorCode), |
| hdr.HighwaterMarkOffset, |
| tver->version, fetch_version); |
| |
| |
| /* Update hi offset to be able to compute |
| * consumer lag. */ |
| /* FIXME: if IsolationLevel==READ_COMMITTED, |
| * use hdr.LastStableOffset */ |
| rktp->rktp_offsets.hi_offset = hdr.HighwaterMarkOffset; |
| |
| |
| /* High offset for get_watermark_offsets() */ |
| rd_kafka_toppar_lock(rktp); |
| rktp->rktp_hi_offset = hdr.HighwaterMarkOffset; |
| rd_kafka_toppar_unlock(rktp); |
| |
| /* If this is the last message of the queue, |
| * signal EOF back to the application. */ |
| if (hdr.HighwaterMarkOffset == |
| rktp->rktp_offsets.fetch_offset |
| && |
| rktp->rktp_offsets.eof_offset != |
| rktp->rktp_offsets.fetch_offset) { |
| hdr.ErrorCode = |
| RD_KAFKA_RESP_ERR__PARTITION_EOF; |
| rktp->rktp_offsets.eof_offset = |
| rktp->rktp_offsets.fetch_offset; |
| } |
| |
| /* Handle partition-level errors. */ |
| if (unlikely(hdr.ErrorCode != |
| RD_KAFKA_RESP_ERR_NO_ERROR)) { |
| /* Some errors should be passed to the |
| * application while some handled by rdkafka */ |
| switch (hdr.ErrorCode) |
| { |
| /* Errors handled by rdkafka */ |
| case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: |
| case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: |
| case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: |
| /* Request metadata information update*/ |
| rd_kafka_toppar_leader_unavailable( |
| rktp, "fetch", hdr.ErrorCode); |
| break; |
| |
| /* Application errors */ |
| case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE: |
| { |
| int64_t err_offset = |
| rktp->rktp_offsets.fetch_offset; |
| rktp->rktp_offsets.fetch_offset = |
| RD_KAFKA_OFFSET_INVALID; |
| rd_kafka_offset_reset( |
| rktp, err_offset, |
| hdr.ErrorCode, |
| rd_kafka_err2str(hdr. |
| ErrorCode)); |
| } |
| break; |
| case RD_KAFKA_RESP_ERR__PARTITION_EOF: |
| if (!rkb->rkb_rk->rk_conf.enable_partition_eof) |
| break; |
| /* FALLTHRU */ |
| case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE: |
| default: /* and all other errors */ |
| rd_dassert(tver->version > 0); |
| rd_kafka_q_op_err( |
| rktp->rktp_fetchq, |
| RD_KAFKA_OP_CONSUMER_ERR, |
| hdr.ErrorCode, tver->version, |
| rktp, |
| rktp->rktp_offsets.fetch_offset, |
| "%s", |
| rd_kafka_err2str(hdr.ErrorCode)); |
| break; |
| } |
| |
| rd_kafka_toppar_fetch_backoff(rkb, rktp, |
| hdr.ErrorCode); |
| |
| rd_kafka_toppar_destroy(s_rktp);/* from get()*/ |
| |
| rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); |
| continue; |
| } |
| |
| if (unlikely(hdr.MessageSetSize <= 0)) { |
| rd_kafka_toppar_destroy(s_rktp); /*from get()*/ |
| continue; |
| } |
| |
| /** |
| * Parse MessageSet |
| */ |
| if (!rd_slice_narrow_relative( |
| &rkbuf->rkbuf_reader, |
| &save_slice, |
| (size_t)hdr.MessageSetSize)) |
| rd_kafka_buf_check_len(rkbuf, |
| hdr.MessageSetSize); |
| |
| /* Parse messages */ |
| err = rd_kafka_msgset_parse(rkbuf, request, rktp, tver); |
| |
| rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); |
| /* Continue with next partition regardless of |
| * parse errors (which are partition-specific) */ |
| |
| /* On error: back off the fetcher for this partition */ |
| if (unlikely(err)) |
| rd_kafka_toppar_fetch_backoff(rkb, rktp, err); |
| |
| rd_kafka_toppar_destroy(s_rktp); /* from get */ |
| } |
| |
| if (s_rkt) { |
| rd_kafka_topic_destroy0(s_rkt); |
| s_rkt = NULL; |
| } |
| } |
| |
| if (rd_kafka_buf_read_remain(rkbuf) != 0) { |
| rd_kafka_buf_parse_fail(rkbuf, |
| "Remaining data after message set " |
| "parse: %"PRIusz" bytes", |
| rd_kafka_buf_read_remain(rkbuf)); |
| RD_NOTREACHED(); |
| } |
| |
| return 0; |
| |
| err_parse: |
| if (s_rkt) |
| rd_kafka_topic_destroy0(s_rkt); |
| rd_rkb_dbg(rkb, MSG, "BADMSG", "Bad message (Fetch v%d): " |
| "is broker.version.fallback incorrectly set?", |
| (int)request->rkbuf_reqhdr.ApiVersion); |
| return rkbuf->rkbuf_err; |
| } |
| |
| |
| |
| static void rd_kafka_broker_fetch_reply (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *reply, |
| rd_kafka_buf_t *request, |
| void *opaque) { |
| rd_kafka_assert(rkb->rkb_rk, rkb->rkb_fetching > 0); |
| rkb->rkb_fetching = 0; |
| |
| /* Parse and handle the messages (unless the request errored) */ |
| if (!err && reply) |
| err = rd_kafka_fetch_reply_handle(rkb, reply, request); |
| |
| if (unlikely(err)) { |
| char tmp[128]; |
| |
| rd_rkb_dbg(rkb, MSG, "FETCH", "Fetch reply: %s", |
| rd_kafka_err2str(err)); |
| switch (err) |
| { |
| case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: |
| case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: |
| case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: |
| case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: |
| /* Request metadata information update */ |
| rd_snprintf(tmp, sizeof(tmp), |
| "FetchRequest failed: %s", |
| rd_kafka_err2str(err)); |
| rd_kafka_metadata_refresh_known_topics(rkb->rkb_rk, |
| NULL, 1/*force*/, |
| tmp); |
| /* FALLTHRU */ |
| |
| case RD_KAFKA_RESP_ERR__TRANSPORT: |
| case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: |
| case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: |
| /* The fetch is already intervalled from |
| * consumer_serve() so dont retry. */ |
| break; |
| |
| default: |
| break; |
| } |
| |
| rd_kafka_broker_fetch_backoff(rkb, err); |
| /* FALLTHRU */ |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| /** |
| * Build and send a Fetch request message for all underflowed toppars |
| * for a specific broker. |
| */ |
| static int rd_kafka_broker_fetch_toppars (rd_kafka_broker_t *rkb, rd_ts_t now) { |
| rd_kafka_toppar_t *rktp; |
| rd_kafka_buf_t *rkbuf; |
| int cnt = 0; |
| size_t of_TopicArrayCnt = 0; |
| int TopicArrayCnt = 0; |
| size_t of_PartitionArrayCnt = 0; |
| int PartitionArrayCnt = 0; |
| rd_kafka_itopic_t *rkt_last = NULL; |
| |
| /* Create buffer and segments: |
| * 1 x ReplicaId MaxWaitTime MinBytes TopicArrayCnt |
| * N x topic name |
| * N x PartitionArrayCnt Partition FetchOffset MaxBytes |
| * where N = number of toppars. |
| * Since we dont keep track of the number of topics served by |
| * this broker, only the partition count, we do a worst-case calc |
| * when allocating and assume each partition is on its own topic |
| */ |
| |
| if (unlikely(rkb->rkb_fetch_toppar_cnt == 0)) |
| return 0; |
| |
| rkbuf = rd_kafka_buf_new_request( |
| rkb, RD_KAFKAP_Fetch, 1, |
| /* ReplicaId+MaxWaitTime+MinBytes+TopicCnt */ |
| 4+4+4+4+ |
| /* N x PartCnt+Partition+FetchOffset+MaxBytes+?TopicNameLen?*/ |
| (rkb->rkb_fetch_toppar_cnt * (4+4+8+4+40))); |
| |
| if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2) |
| rd_kafka_buf_ApiVersion_set(rkbuf, 4, |
| RD_KAFKA_FEATURE_MSGVER2); |
| else if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1) |
| rd_kafka_buf_ApiVersion_set(rkbuf, 2, |
| RD_KAFKA_FEATURE_MSGVER1); |
| else if (rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME) |
| rd_kafka_buf_ApiVersion_set(rkbuf, 1, |
| RD_KAFKA_FEATURE_THROTTLETIME); |
| |
| |
| /* FetchRequest header */ |
| /* ReplicaId */ |
| rd_kafka_buf_write_i32(rkbuf, -1); |
| /* MaxWaitTime */ |
| rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_wait_max_ms); |
| /* MinBytes */ |
| rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_min_bytes); |
| |
| if (rd_kafka_buf_ApiVersion(rkbuf) == 4) { |
| /* MaxBytes */ |
| rd_kafka_buf_write_i32(rkbuf, |
| rkb->rkb_rk->rk_conf.recv_max_msg_size); |
| /* IsolationLevel */ |
| rd_kafka_buf_write_i8(rkbuf, RD_KAFKAP_READ_UNCOMMITTED); |
| } |
| |
| /* Write zero TopicArrayCnt but store pointer for later update */ |
| of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| |
| /* Prepare map for storing the fetch version for each partition, |
| * this will later be checked in Fetch response to purge outdated |
| * responses (e.g., after a seek). */ |
| rkbuf->rkbuf_rktp_vers = rd_list_new( |
| 0, (void *)rd_kafka_toppar_ver_destroy); |
| rd_list_prealloc_elems(rkbuf->rkbuf_rktp_vers, |
| sizeof(struct rd_kafka_toppar_ver), |
| rkb->rkb_fetch_toppar_cnt); |
| |
| /* Round-robin start of the list. */ |
| rktp = rkb->rkb_fetch_toppar_next; |
| do { |
| struct rd_kafka_toppar_ver *tver; |
| |
| if (rkt_last != rktp->rktp_rkt) { |
| if (rkt_last != NULL) { |
| /* Update PartitionArrayCnt */ |
| rd_kafka_buf_update_i32(rkbuf, |
| of_PartitionArrayCnt, |
| PartitionArrayCnt); |
| } |
| |
| /* Topic name */ |
| rd_kafka_buf_write_kstr(rkbuf, |
| rktp->rktp_rkt->rkt_topic); |
| TopicArrayCnt++; |
| rkt_last = rktp->rktp_rkt; |
| /* Partition count */ |
| of_PartitionArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); |
| PartitionArrayCnt = 0; |
| } |
| |
| PartitionArrayCnt++; |
| /* Partition */ |
| rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition); |
| /* FetchOffset */ |
| rd_kafka_buf_write_i64(rkbuf, rktp->rktp_offsets.fetch_offset); |
| /* MaxBytes */ |
| rd_kafka_buf_write_i32(rkbuf, rktp->rktp_fetch_msg_max_bytes); |
| |
| rd_rkb_dbg(rkb, FETCH, "FETCH", |
| "Fetch topic %.*s [%"PRId32"] at offset %"PRId64 |
| " (v%d)", |
| RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), |
| rktp->rktp_partition, |
| rktp->rktp_offsets.fetch_offset, |
| rktp->rktp_fetch_version); |
| |
| /* Add toppar + op version mapping. */ |
| tver = rd_list_add(rkbuf->rkbuf_rktp_vers, NULL); |
| tver->s_rktp = rd_kafka_toppar_keep(rktp); |
| tver->version = rktp->rktp_fetch_version; |
| |
| cnt++; |
| } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_fetch_toppars, |
| rktp, rktp_fetchlink)) != |
| rkb->rkb_fetch_toppar_next); |
| |
| /* Update next toppar to fetch in round-robin list. */ |
| rd_kafka_broker_fetch_toppar_next(rkb, |
| rktp ? |
| CIRCLEQ_LOOP_NEXT(&rkb-> |
| rkb_fetch_toppars, |
| rktp, rktp_fetchlink): |
| NULL); |
| |
| rd_rkb_dbg(rkb, FETCH, "FETCH", "Fetch %i/%i/%i toppar(s)", |
| cnt, rkb->rkb_fetch_toppar_cnt, rkb->rkb_toppar_cnt); |
| if (!cnt) { |
| rd_kafka_buf_destroy(rkbuf); |
| return cnt; |
| } |
| |
| if (rkt_last != NULL) { |
| /* Update last topic's PartitionArrayCnt */ |
| rd_kafka_buf_update_i32(rkbuf, |
| of_PartitionArrayCnt, |
| PartitionArrayCnt); |
| } |
| |
| /* Update TopicArrayCnt */ |
| rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, TopicArrayCnt); |
| |
| /* Use configured timeout */ |
| rkbuf->rkbuf_ts_timeout = now + |
| ((rkb->rkb_rk->rk_conf.socket_timeout_ms + |
| rkb->rkb_rk->rk_conf.fetch_wait_max_ms) * 1000); |
| |
| /* Sort toppar versions for quicker lookups in Fetch response. */ |
| rd_list_sort(rkbuf->rkbuf_rktp_vers, rd_kafka_toppar_ver_cmp); |
| |
| rkb->rkb_fetching = 1; |
| rd_kafka_broker_buf_enq1(rkb, rkbuf, rd_kafka_broker_fetch_reply, NULL); |
| |
| return cnt; |
| } |
| |
| |
| |
| |
| /** |
| * Consumer serving |
| */ |
| static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb) { |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| |
| rd_kafka_broker_lock(rkb); |
| |
| while (!rd_kafka_broker_terminating(rkb) && |
| rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) { |
| rd_ts_t now; |
| rd_ts_t min_backoff; |
| |
| rd_kafka_broker_unlock(rkb); |
| |
| now = rd_clock(); |
| |
| /* Serve toppars */ |
| min_backoff = rd_kafka_broker_toppars_serve(rkb); |
| if (rkb->rkb_ts_fetch_backoff > now && |
| rkb->rkb_ts_fetch_backoff < min_backoff) |
| min_backoff = rkb->rkb_ts_fetch_backoff; |
| |
| /* Send Fetch request message for all underflowed toppars */ |
| if (!rkb->rkb_fetching) { |
| if (min_backoff < now) { |
| rd_kafka_broker_fetch_toppars(rkb, now); |
| rkb->rkb_blocking_max_ms = |
| rkb->rkb_rk-> |
| rk_conf.socket_blocking_max_ms; |
| } else { |
| if (min_backoff < RD_TS_MAX) |
| rd_rkb_dbg(rkb, FETCH, "FETCH", |
| "Fetch backoff for %"PRId64 |
| "ms", |
| (min_backoff-now)/1000); |
| |
| /* Don't block for more than 1000 ms |
| * or less than 1 ms. */ |
| rkb->rkb_blocking_max_ms = 1 + |
| (int)RD_MIN(1000, |
| (min_backoff - now) / 1000); |
| } |
| } |
| |
| /* Check and move retry buffers */ |
| if (unlikely(rd_atomic32_get(&rkb->rkb_retrybufs.rkbq_cnt) > 0)) |
| rd_kafka_broker_retry_bufs_move(rkb); |
| |
| rd_kafka_broker_serve(rkb, |
| now + (rkb->rkb_blocking_max_ms * 1000)); |
| |
| rd_kafka_broker_lock(rkb); |
| } |
| |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| |
| static int rd_kafka_broker_thread_main (void *arg) { |
| rd_kafka_broker_t *rkb = arg; |
| rd_kafka_t *rk = rkb->rkb_rk; |
| |
| rd_snprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), |
| "%s", rkb->rkb_name); |
| |
| (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1); |
| |
| /* Our own refcount was increased just prior to thread creation, |
| * when refcount drops to 1 it is just us left and the broker |
| * thread should terminate. */ |
| |
| /* Acquire lock (which was held by thread creator during creation) |
| * to synchronise state. */ |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_unlock(rkb); |
| |
| rd_rkb_dbg(rkb, BROKER, "BRKMAIN", "Enter main broker thread"); |
| |
| while (!rd_kafka_broker_terminating(rkb)) { |
| rd_ts_t backoff; |
| |
| switch (rkb->rkb_state) |
| { |
| case RD_KAFKA_BROKER_STATE_INIT: |
| /* The INIT state exists so that an initial connection |
| * failure triggers a state transition which might |
| * trigger a ALL_BROKERS_DOWN error. */ |
| case RD_KAFKA_BROKER_STATE_DOWN: |
| if (rkb->rkb_source == RD_KAFKA_INTERNAL) { |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb, |
| RD_KAFKA_BROKER_STATE_UP); |
| rd_kafka_broker_unlock(rkb); |
| break; |
| } |
| |
| /* Throttle & jitter reconnects to avoid |
| * thundering horde of reconnecting clients after |
| * a broker / network outage. Issue #403 */ |
| if (rkb->rkb_rk->rk_conf.reconnect_jitter_ms && |
| (backoff = |
| rd_interval_immediate( |
| &rkb->rkb_connect_intvl, |
| rd_jitter(rkb->rkb_rk->rk_conf. |
| reconnect_jitter_ms*500, |
| rkb->rkb_rk->rk_conf. |
| reconnect_jitter_ms*1500), |
| 0)) <= 0) { |
| backoff = -backoff/1000; |
| rd_rkb_dbg(rkb, BROKER, "RECONNECT", |
| "Delaying next reconnect by %dms", |
| (int)backoff); |
| rd_kafka_broker_ua_idle(rkb, (int)backoff); |
| continue; |
| } |
| |
| /* Initiate asynchronous connection attempt. |
| * Only the host lookup is blocking here. */ |
| if (rd_kafka_broker_connect(rkb) == -1) { |
| /* Immediate failure, most likely host |
| * resolving failed. |
| * Try the next resolve result until we've |
| * tried them all, in which case we sleep a |
| * short while to avoid busy looping. */ |
| if (!rkb->rkb_rsal || |
| rkb->rkb_rsal->rsal_cnt == 0 || |
| rkb->rkb_rsal->rsal_curr + 1 == |
| rkb->rkb_rsal->rsal_cnt) |
| rd_kafka_broker_ua_idle(rkb, 1000); |
| } |
| break; |
| |
| case RD_KAFKA_BROKER_STATE_CONNECT: |
| case RD_KAFKA_BROKER_STATE_AUTH: |
| case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE: |
| case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY: |
| /* Asynchronous connect in progress. */ |
| rd_kafka_broker_ua_idle(rkb, RD_POLL_INFINITE); |
| |
| if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN) { |
| /* Connect failure. |
| * Try the next resolve result until we've |
| * tried them all, in which case we sleep a |
| * short while to avoid busy looping. */ |
| if (!rkb->rkb_rsal || |
| rkb->rkb_rsal->rsal_cnt == 0 || |
| rkb->rkb_rsal->rsal_curr + 1 == |
| rkb->rkb_rsal->rsal_cnt) |
| rd_kafka_broker_ua_idle(rkb, 1000); |
| } |
| break; |
| |
| case RD_KAFKA_BROKER_STATE_UPDATE: |
| /* FALLTHRU */ |
| case RD_KAFKA_BROKER_STATE_UP: |
| if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA) |
| rd_kafka_broker_ua_idle(rkb, RD_POLL_INFINITE); |
| else if (rk->rk_type == RD_KAFKA_PRODUCER) |
| rd_kafka_broker_producer_serve(rkb); |
| else if (rk->rk_type == RD_KAFKA_CONSUMER) |
| rd_kafka_broker_consumer_serve(rkb); |
| |
| if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_UPDATE) { |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_UP); |
| rd_kafka_broker_unlock(rkb); |
| } else { |
| /* Connection torn down, sleep a short while to |
| * avoid busy-looping on protocol errors */ |
| rd_usleep(100*1000/*100ms*/, &rk->rk_terminate); |
| } |
| break; |
| } |
| |
| if (rd_kafka_terminating(rkb->rkb_rk)) { |
| /* Handle is terminating: fail the send+retry queue |
| * to speed up termination, otherwise we'll |
| * need to wait for request timeouts. */ |
| int r; |
| |
| r = rd_kafka_broker_bufq_timeout_scan( |
| rkb, 0, &rkb->rkb_outbufs, NULL, |
| RD_KAFKA_RESP_ERR__DESTROY, 0); |
| r += rd_kafka_broker_bufq_timeout_scan( |
| rkb, 0, &rkb->rkb_retrybufs, NULL, |
| RD_KAFKA_RESP_ERR__DESTROY, 0); |
| rd_rkb_dbg(rkb, BROKER, "TERMINATE", |
| "Handle is terminating: " |
| "failed %d request(s) in " |
| "retry+outbuf", r); |
| } |
| } |
| |
| if (rkb->rkb_source != RD_KAFKA_INTERNAL) { |
| rd_kafka_wrlock(rkb->rkb_rk); |
| TAILQ_REMOVE(&rkb->rkb_rk->rk_brokers, rkb, rkb_link); |
| if (rkb->rkb_nodeid != -1) |
| rd_list_remove(&rkb->rkb_rk->rk_broker_by_id, rkb); |
| (void)rd_atomic32_sub(&rkb->rkb_rk->rk_broker_cnt, 1); |
| rd_kafka_wrunlock(rkb->rkb_rk); |
| } |
| |
| rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__DESTROY, NULL); |
| rd_kafka_broker_destroy(rkb); |
| |
| #if WITH_SSL |
| /* Remove OpenSSL per-thread error state to avoid memory leaks */ |
| #if OPENSSL_VERSION_NUMBER >= 0x10100000L && !defined(LIBRESSL_VERSION_NUMBER) |
| /*(OpenSSL libraries handle thread init and deinit) |
| * https://github.com/openssl/openssl/pull/1048 */ |
| #elif OPENSSL_VERSION_NUMBER >= 0x10000000L |
| ERR_remove_thread_state(NULL); |
| #endif |
| #endif |
| |
| rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1); |
| |
| return 0; |
| } |
| |
| |
| /** |
| * Final destructor. Refcnt must be 0. |
| */ |
| void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb) { |
| |
| rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread)); |
| rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs)); |
| rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs)); |
| rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_retrybufs.rkbq_bufs)); |
| rd_kafka_assert(rkb->rkb_rk, TAILQ_EMPTY(&rkb->rkb_toppars)); |
| |
| if (rkb->rkb_source != RD_KAFKA_INTERNAL && |
| (rkb->rkb_rk->rk_conf.security_protocol == |
| RD_KAFKA_PROTO_SASL_PLAINTEXT || |
| rkb->rkb_rk->rk_conf.security_protocol == |
| RD_KAFKA_PROTO_SASL_SSL)) |
| rd_kafka_sasl_broker_term(rkb); |
| |
| if (rkb->rkb_wakeup_fd[0] != -1) |
| rd_close(rkb->rkb_wakeup_fd[0]); |
| if (rkb->rkb_wakeup_fd[1] != -1) |
| rd_close(rkb->rkb_wakeup_fd[1]); |
| |
| if (rkb->rkb_recv_buf) |
| rd_kafka_buf_destroy(rkb->rkb_recv_buf); |
| |
| if (rkb->rkb_rsal) |
| rd_sockaddr_list_destroy(rkb->rkb_rsal); |
| |
| if (rkb->rkb_ApiVersions) |
| rd_free(rkb->rkb_ApiVersions); |
| rd_free(rkb->rkb_origname); |
| |
| rd_kafka_q_purge(rkb->rkb_ops); |
| rd_kafka_q_destroy(rkb->rkb_ops); |
| |
| rd_avg_destroy(&rkb->rkb_avg_int_latency); |
| rd_avg_destroy(&rkb->rkb_avg_rtt); |
| rd_avg_destroy(&rkb->rkb_avg_throttle); |
| |
| mtx_lock(&rkb->rkb_logname_lock); |
| rd_free(rkb->rkb_logname); |
| rkb->rkb_logname = NULL; |
| mtx_unlock(&rkb->rkb_logname_lock); |
| mtx_destroy(&rkb->rkb_logname_lock); |
| |
| mtx_destroy(&rkb->rkb_lock); |
| |
| rd_refcnt_destroy(&rkb->rkb_refcnt); |
| |
| rd_free(rkb); |
| } |
| |
| /** |
| * Returns the internal broker with refcnt increased. |
| */ |
| rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk) { |
| rd_kafka_broker_t *rkb; |
| |
| mtx_lock(&rk->rk_internal_rkb_lock); |
| rkb = rk->rk_internal_rkb; |
| if (rkb) |
| rd_kafka_broker_keep(rkb); |
| mtx_unlock(&rk->rk_internal_rkb_lock); |
| |
| return rkb; |
| } |
| |
| |
| /** |
| * Adds a broker with refcount set to 1. |
| * If 'source' is RD_KAFKA_INTERNAL an internal broker is added |
| * that does not actually represent or connect to a real broker, it is used |
| * for serving unassigned toppar's op queues. |
| * |
| * Locks: rd_kafka_wrlock(rk) must be held |
| */ |
| rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, |
| rd_kafka_confsource_t source, |
| rd_kafka_secproto_t proto, |
| const char *name, uint16_t port, |
| int32_t nodeid) { |
| rd_kafka_broker_t *rkb; |
| #ifndef _MSC_VER |
| int r; |
| sigset_t newset, oldset; |
| #endif |
| |
| rkb = rd_calloc(1, sizeof(*rkb)); |
| |
| rd_kafka_mk_nodename(rkb->rkb_nodename, sizeof(rkb->rkb_nodename), |
| name, port); |
| rd_kafka_mk_brokername(rkb->rkb_name, sizeof(rkb->rkb_name), |
| proto, rkb->rkb_nodename, nodeid, source); |
| |
| rkb->rkb_source = source; |
| rkb->rkb_rk = rk; |
| rkb->rkb_nodeid = nodeid; |
| rkb->rkb_proto = proto; |
| rkb->rkb_port = port; |
| rkb->rkb_origname = rd_strdup(name); |
| |
| mtx_init(&rkb->rkb_lock, mtx_plain); |
| mtx_init(&rkb->rkb_logname_lock, mtx_plain); |
| rkb->rkb_logname = rd_strdup(rkb->rkb_name); |
| TAILQ_INIT(&rkb->rkb_toppars); |
| CIRCLEQ_INIT(&rkb->rkb_fetch_toppars); |
| rd_kafka_bufq_init(&rkb->rkb_outbufs); |
| rd_kafka_bufq_init(&rkb->rkb_waitresps); |
| rd_kafka_bufq_init(&rkb->rkb_retrybufs); |
| rkb->rkb_ops = rd_kafka_q_new(rk); |
| rd_interval_init(&rkb->rkb_connect_intvl); |
| rd_avg_init(&rkb->rkb_avg_int_latency, RD_AVG_GAUGE); |
| rd_avg_init(&rkb->rkb_avg_rtt, RD_AVG_GAUGE); |
| rd_avg_init(&rkb->rkb_avg_throttle, RD_AVG_GAUGE); |
| rd_refcnt_init(&rkb->rkb_refcnt, 0); |
| rd_kafka_broker_keep(rkb); /* rk_broker's refcount */ |
| |
| rkb->rkb_blocking_max_ms = rk->rk_conf.socket_blocking_max_ms; |
| |
| /* ApiVersion fallback interval */ |
| if (rkb->rkb_rk->rk_conf.api_version_request) { |
| rd_interval_init(&rkb->rkb_ApiVersion_fail_intvl); |
| rd_interval_fixed(&rkb->rkb_ApiVersion_fail_intvl, |
| rkb->rkb_rk->rk_conf.api_version_fallback_ms*1000); |
| } |
| |
| /* Set next intervalled metadata refresh, offset by a random |
| * value to avoid all brokers to be queried simultaneously. */ |
| if (rkb->rkb_rk->rk_conf.metadata_refresh_interval_ms >= 0) |
| rkb->rkb_ts_metadata_poll = rd_clock() + |
| (rkb->rkb_rk->rk_conf. |
| metadata_refresh_interval_ms * 1000) + |
| (rd_jitter(500,1500) * 1000); |
| else /* disabled */ |
| rkb->rkb_ts_metadata_poll = UINT64_MAX; |
| |
| #ifndef _MSC_VER |
| /* Block all signals in newly created thread. |
| * To avoid race condition we block all signals in the calling |
| * thread, which the new thread will inherit its sigmask from, |
| * and then restore the original sigmask of the calling thread when |
| * we're done creating the thread. |
| * NOTE: term_sig remains unblocked since we use it on termination |
| * to quickly interrupt system calls. */ |
| sigemptyset(&oldset); |
| sigfillset(&newset); |
| if (rkb->rkb_rk->rk_conf.term_sig) |
| sigdelset(&newset, rkb->rkb_rk->rk_conf.term_sig); |
| pthread_sigmask(SIG_SETMASK, &newset, &oldset); |
| #endif |
| |
| /* |
| * Fd-based queue wake-ups using a non-blocking pipe. |
| * Writes are best effort, if the socket queue is full |
| * the write fails (silently) but this has no effect on latency |
| * since the POLLIN flag will already have been raised for fd. |
| */ |
| rkb->rkb_wakeup_fd[0] = -1; |
| rkb->rkb_wakeup_fd[1] = -1; |
| rkb->rkb_toppar_wakeup_fd = -1; |
| |
| #ifndef _MSC_VER /* pipes cant be mixed with WSAPoll on Win32 */ |
| if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) { |
| rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD", |
| "Failed to setup broker queue wake-up fds: " |
| "%s: disabling low-latency mode", |
| rd_strerror(r)); |
| |
| } else if (source == RD_KAFKA_INTERNAL) { |
| /* nop: internal broker has no IO transport. */ |
| |
| } else { |
| char onebyte = 1; |
| |
| /* Since there is a small syscall penalty, |
| * only enable partition message queue wake-ups |
| * if latency contract demands it. |
| * rkb_ops queue wakeups are always enabled though, |
| * since they are much more infrequent. */ |
| if (rk->rk_conf.buffering_max_ms < |
| rk->rk_conf.socket_blocking_max_ms) { |
| rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD", |
| "Enabled low-latency partition " |
| "queue wake-ups"); |
| rkb->rkb_toppar_wakeup_fd = rkb->rkb_wakeup_fd[1]; |
| } |
| |
| |
| rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD", |
| "Enabled low-latency ops queue wake-ups"); |
| rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1], |
| &onebyte, sizeof(onebyte)); |
| } |
| #endif |
| |
| /* Lock broker's lock here to synchronise state, i.e., hold off |
| * the broker thread until we've finalized the rkb. */ |
| rd_kafka_broker_lock(rkb); |
| rd_kafka_broker_keep(rkb); /* broker thread's refcnt */ |
| if (thrd_create(&rkb->rkb_thread, |
| rd_kafka_broker_thread_main, rkb) != thrd_success) { |
| char tmp[512]; |
| rd_snprintf(tmp, sizeof(tmp), |
| "Unable to create broker thread: %s (%i)", |
| rd_strerror(errno), errno); |
| rd_kafka_log(rk, LOG_CRIT, "THREAD", "%s", tmp); |
| |
| rd_kafka_broker_unlock(rkb); |
| |
| /* Send ERR op back to application for processing. */ |
| rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE, |
| "%s", tmp); |
| |
| rd_free(rkb); |
| |
| #ifndef _MSC_VER |
| /* Restore sigmask of caller */ |
| pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
| #endif |
| |
| return NULL; |
| } |
| |
| if (rkb->rkb_source != RD_KAFKA_INTERNAL) { |
| if (rk->rk_conf.security_protocol == |
| RD_KAFKA_PROTO_SASL_PLAINTEXT || |
| rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) |
| rd_kafka_sasl_broker_init(rkb); |
| |
| TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link); |
| (void)rd_atomic32_add(&rkb->rkb_rk->rk_broker_cnt, 1); |
| |
| if (rkb->rkb_nodeid != -1) { |
| rd_list_add(&rkb->rkb_rk->rk_broker_by_id, rkb); |
| rd_list_sort(&rkb->rkb_rk->rk_broker_by_id, |
| rd_kafka_broker_cmp_by_id); |
| } |
| |
| rd_rkb_dbg(rkb, BROKER, "BROKER", |
| "Added new broker with NodeId %"PRId32, |
| rkb->rkb_nodeid); |
| } |
| |
| rd_kafka_broker_unlock(rkb); |
| |
| #ifndef _MSC_VER |
| /* Restore sigmask of caller */ |
| pthread_sigmask(SIG_SETMASK, &oldset, NULL); |
| #endif |
| |
| return rkb; |
| } |
| |
| /** |
| * @brief Find broker by nodeid (not -1) and |
| * possibly filtered by state (unless -1). |
| * |
| * @locks: rd_kafka_*lock() MUST be held |
| * @remark caller must release rkb reference by rd_kafka_broker_destroy() |
| */ |
| rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk, |
| int32_t nodeid, |
| int state) { |
| rd_kafka_broker_t *rkb; |
| rd_kafka_broker_t skel = { .rkb_nodeid = nodeid }; |
| |
| if (rd_kafka_terminating(rk)) |
| return NULL; |
| |
| rkb = rd_list_find(&rk->rk_broker_by_id, &skel, |
| rd_kafka_broker_cmp_by_id); |
| |
| if (!rkb) |
| return NULL; |
| |
| if (state != -1) { |
| int broker_state; |
| rd_kafka_broker_lock(rkb); |
| broker_state = (int)rkb->rkb_state; |
| rd_kafka_broker_unlock(rkb); |
| |
| if (broker_state != state) |
| return NULL; |
| } |
| |
| rd_kafka_broker_keep(rkb); |
| return rkb; |
| } |
| |
| /** |
| * Locks: rd_kafka_rdlock(rk) must be held |
| * NOTE: caller must release rkb reference by rd_kafka_broker_destroy() |
| */ |
| static rd_kafka_broker_t *rd_kafka_broker_find (rd_kafka_t *rk, |
| rd_kafka_secproto_t proto, |
| const char *name, |
| uint16_t port) { |
| rd_kafka_broker_t *rkb; |
| char nodename[RD_KAFKA_NODENAME_SIZE]; |
| |
| rd_kafka_mk_nodename(nodename, sizeof(nodename), name, port); |
| |
| TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
| rd_kafka_broker_lock(rkb); |
| if (!rd_atomic32_get(&rk->rk_terminate) && |
| rkb->rkb_proto == proto && |
| !strcmp(rkb->rkb_nodename, nodename)) { |
| rd_kafka_broker_keep(rkb); |
| rd_kafka_broker_unlock(rkb); |
| return rkb; |
| } |
| rd_kafka_broker_unlock(rkb); |
| } |
| |
| return NULL; |
| } |
| |
| |
| /** |
| * Parse a broker host name. |
| * The string 'name' is modified and null-terminated portions of it |
| * are returned in 'proto', 'host', and 'port'. |
| * |
| * Returns 0 on success or -1 on parse error. |
| */ |
| static int rd_kafka_broker_name_parse (rd_kafka_t *rk, |
| char **name, |
| rd_kafka_secproto_t *proto, |
| const char **host, |
| uint16_t *port) { |
| char *s = *name; |
| char *orig; |
| char *n, *t, *t2; |
| |
| /* Save a temporary copy of the original name for logging purposes */ |
| rd_strdupa(&orig, *name); |
| |
| /* Find end of this name (either by delimiter or end of string */ |
| if ((n = strchr(s, ','))) |
| *n = '\0'; |
| else |
| n = s + strlen(s)-1; |
| |
| |
| /* Check if this looks like an url. */ |
| if ((t = strstr(s, "://"))) { |
| int i; |
| /* "proto://host[:port]" */ |
| |
| if (t == s) { |
| rd_kafka_log(rk, LOG_WARNING, "BROKER", |
| "Broker name \"%s\" parse error: " |
| "empty protocol name", orig); |
| return -1; |
| } |
| |
| /* Make protocol uppercase */ |
| for (t2 = s ; t2 < t ; t2++) |
| *t2 = toupper(*t2); |
| |
| *t = '\0'; |
| |
| /* Find matching protocol by name. */ |
| for (i = 0 ; i < RD_KAFKA_PROTO_NUM ; i++) |
| if (!rd_strcasecmp(s, rd_kafka_secproto_names[i])) |
| break; |
| |
| /* Unsupported protocol */ |
| if (i == RD_KAFKA_PROTO_NUM) { |
| rd_kafka_log(rk, LOG_WARNING, "BROKER", |
| "Broker name \"%s\" parse error: " |
| "unsupported protocol \"%s\"", orig, s); |
| |
| return -1; |
| } |
| |
| *proto = i; |
| |
| /* Enforce protocol */ |
| if (rk->rk_conf.security_protocol != *proto) { |
| rd_kafka_log(rk, LOG_WARNING, "BROKER", |
| "Broker name \"%s\" parse error: " |
| "protocol \"%s\" does not match " |
| "security.protocol setting \"%s\"", |
| orig, s, |
| rd_kafka_secproto_names[ |
| rk->rk_conf.security_protocol]); |
| return -1; |
| } |
| |
| /* Hostname starts here */ |
| s = t+3; |
| |
| /* Ignore anything that looks like the path part of an URL */ |
| if ((t = strchr(s, '/'))) |
| *t = '\0'; |
| |
| } else |
| *proto = rk->rk_conf.security_protocol; /* Default protocol */ |
| |
| |
| *port = RD_KAFKA_PORT; |
| /* Check if port has been specified, but try to identify IPv6 |
| * addresses first: |
| * t = last ':' in string |
| * t2 = first ':' in string |
| * If t and t2 are equal then only one ":" exists in name |
| * and thus an IPv4 address with port specified. |
| * Else if not equal and t is prefixed with "]" then it's an |
| * IPv6 address with port specified. |
| * Else no port specified. */ |
| if ((t = strrchr(s, ':')) && |
| ((t2 = strchr(s, ':')) == t || *(t-1) == ']')) { |
| *t = '\0'; |
| *port = atoi(t+1); |
| } |
| |
| /* Empty host name -> localhost */ |
| if (!*s) |
| s = "localhost"; |
| |
| *host = s; |
| *name = n+1; /* past this name. e.g., next name/delimiter to parse */ |
| |
| return 0; |
| } |
| |
| |
| /** |
| * Adds a (csv list of) broker(s). |
| * Returns the number of brokers succesfully added. |
| * |
| * Locality: any thread |
| * Lock prereqs: none |
| */ |
| int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist) { |
| char *s_copy = rd_strdup(brokerlist); |
| char *s = s_copy; |
| int cnt = 0; |
| rd_kafka_broker_t *rkb; |
| |
| /* Parse comma-separated list of brokers. */ |
| while (*s) { |
| uint16_t port; |
| const char *host; |
| rd_kafka_secproto_t proto; |
| |
| if (*s == ',' || *s == ' ') { |
| s++; |
| continue; |
| } |
| |
| if (rd_kafka_broker_name_parse(rk, &s, &proto, |
| &host, &port) == -1) |
| break; |
| |
| rd_kafka_wrlock(rk); |
| |
| if ((rkb = rd_kafka_broker_find(rk, proto, host, port)) && |
| rkb->rkb_source == RD_KAFKA_CONFIGURED) { |
| cnt++; |
| } else if (rd_kafka_broker_add(rk, RD_KAFKA_CONFIGURED, |
| proto, host, port, |
| RD_KAFKA_NODEID_UA) != NULL) |
| cnt++; |
| |
| /* If rd_kafka_broker_find returned a broker its |
| * reference needs to be released |
| * See issue #193 */ |
| if (rkb) |
| rd_kafka_broker_destroy(rkb); |
| |
| rd_kafka_wrunlock(rk); |
| } |
| |
| rd_free(s_copy); |
| |
| return cnt; |
| } |
| |
| |
| int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist) { |
| return rd_kafka_brokers_add0(rk, brokerlist); |
| } |
| |
| |
| /** |
| * Adds a new broker or updates an existing one. |
| * |
| */ |
| void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto, |
| const struct rd_kafka_metadata_broker *mdb) { |
| rd_kafka_broker_t *rkb; |
| char nodename[RD_KAFKA_NODENAME_SIZE]; |
| int needs_update = 0; |
| |
| rd_kafka_mk_nodename(nodename, sizeof(nodename), mdb->host, mdb->port); |
| |
| rd_kafka_wrlock(rk); |
| if (unlikely(rd_atomic32_get(&rk->rk_terminate))) { |
| /* Dont update metadata while terminating, do this |
| * after acquiring lock for proper synchronisation */ |
| rd_kafka_wrunlock(rk); |
| return; |
| } |
| |
| if ((rkb = rd_kafka_broker_find_by_nodeid(rk, mdb->id))) { |
| /* Broker matched by nodeid, see if we need to update |
| * the hostname. */ |
| if (strcmp(rkb->rkb_nodename, nodename)) |
| needs_update = 1; |
| } else if ((rkb = rd_kafka_broker_find(rk, proto, |
| mdb->host, mdb->port))) { |
| /* Broker matched by hostname (but not by nodeid), |
| * update the nodeid. */ |
| needs_update = 1; |
| |
| } else { |
| rd_kafka_broker_add(rk, RD_KAFKA_LEARNED, |
| proto, mdb->host, mdb->port, mdb->id); |
| } |
| |
| rd_kafka_wrunlock(rk); |
| |
| if (rkb) { |
| /* Existing broker */ |
| if (needs_update) { |
| rd_kafka_op_t *rko; |
| |
| rko = rd_kafka_op_new(RD_KAFKA_OP_NODE_UPDATE); |
| strncpy(rko->rko_u.node.nodename, nodename, |
| sizeof(rko->rko_u.node.nodename)-1); |
| rko->rko_u.node.nodeid = mdb->id; |
| rd_kafka_q_enq(rkb->rkb_ops, rko); |
| } |
| rd_kafka_broker_destroy(rkb); |
| } |
| } |
| |
| |
| /** |
| * Returns a thread-safe temporary copy of the broker name. |
| * Must not be called more than 4 times from the same expression. |
| * |
| * Locks: none |
| * Locality: any thread |
| */ |
| const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb) { |
| static RD_TLS char ret[4][RD_KAFKA_NODENAME_SIZE]; |
| static RD_TLS int reti = 0; |
| |
| reti = (reti + 1) % 4; |
| mtx_lock(&rkb->rkb_logname_lock); |
| rd_snprintf(ret[reti], sizeof(ret[reti]), "%s", rkb->rkb_logname); |
| mtx_unlock(&rkb->rkb_logname_lock); |
| |
| return ret[reti]; |
| } |
| |
| |
| /** |
| * @brief Send dummy OP to broker thread to wake it up from IO sleep. |
| * |
| * @locality any |
| * @locks none |
| */ |
| void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb) { |
| rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_WAKEUP); |
| rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH); |
| rd_kafka_q_enq(rkb->rkb_ops, rko); |
| rd_rkb_dbg(rkb, QUEUE, "WAKEUP", "Wake-up"); |
| } |
| |
| |
| void rd_kafka_brokers_init (void) { |
| } |
| |
| |
| |
| |
| |
| |
| |
| |