| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2015, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| #ifdef _MSC_VER |
| #pragma comment(lib, "ws2_32.lib") |
| #endif |
| |
| #define __need_IOV_MAX |
| |
| #define _DARWIN_C_SOURCE /* MSG_DONTWAIT */ |
| |
| #include "rdkafka_int.h" |
| #include "rdaddr.h" |
| #include "rdkafka_transport.h" |
| #include "rdkafka_transport_int.h" |
| #include "rdkafka_broker.h" |
| |
| #include <errno.h> |
| |
| #if WITH_VALGRIND |
| /* OpenSSL relies on uninitialized memory, which Valgrind will whine about. |
| * We use in-code Valgrind macros to suppress those warnings. */ |
| #include <valgrind/memcheck.h> |
| #else |
| #define VALGRIND_MAKE_MEM_DEFINED(A,B) |
| #endif |
| |
| |
| #ifdef _MSC_VER |
| #define socket_errno WSAGetLastError() |
| #else |
| #include <sys/socket.h> |
| #define socket_errno errno |
| #define SOCKET_ERROR -1 |
| #endif |
| |
| /* AIX doesn't have MSG_DONTWAIT */ |
| #ifndef MSG_DONTWAIT |
| # define MSG_DONTWAIT MSG_NONBLOCK |
| #endif |
| |
| |
| #if WITH_SSL |
| static mtx_t *rd_kafka_ssl_locks; |
| static int rd_kafka_ssl_locks_cnt; |
| #endif |
| |
| |
| |
| /** |
| * Low-level socket close |
| */ |
| static void rd_kafka_transport_close0 (rd_kafka_t *rk, int s) { |
| if (rk->rk_conf.closesocket_cb) |
| rk->rk_conf.closesocket_cb(s, rk->rk_conf.opaque); |
| else { |
| #ifndef _MSC_VER |
| close(s); |
| #else |
| closesocket(s); |
| #endif |
| } |
| |
| } |
| |
| /** |
| * Close and destroy a transport handle |
| */ |
| void rd_kafka_transport_close (rd_kafka_transport_t *rktrans) { |
| #if WITH_SSL |
| if (rktrans->rktrans_ssl) { |
| SSL_shutdown(rktrans->rktrans_ssl); |
| SSL_free(rktrans->rktrans_ssl); |
| } |
| #endif |
| |
| rd_kafka_sasl_close(rktrans); |
| |
| if (rktrans->rktrans_recv_buf) |
| rd_kafka_buf_destroy(rktrans->rktrans_recv_buf); |
| |
| if (rktrans->rktrans_s != -1) |
| rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk, |
| rktrans->rktrans_s); |
| |
| rd_free(rktrans); |
| } |
| |
| |
| static const char *socket_strerror(int err) { |
| #ifdef _MSC_VER |
| static RD_TLS char buf[256]; |
| rd_strerror_w32(err, buf, sizeof(buf)); |
| return buf; |
| #else |
| return rd_strerror(err); |
| #endif |
| } |
| |
| |
| |
| |
| #ifndef _MSC_VER |
| /** |
| * @brief sendmsg() abstraction, converting a list of segments to iovecs. |
| * @remark should only be called if the number of segments is > 1. |
| */ |
| ssize_t rd_kafka_transport_socket_sendmsg (rd_kafka_transport_t *rktrans, |
| rd_slice_t *slice, |
| char *errstr, size_t errstr_size) { |
| struct iovec iov[IOV_MAX]; |
| struct msghdr msg = { .msg_iov = iov }; |
| size_t iovlen; |
| ssize_t r; |
| |
| rd_slice_get_iov(slice, msg.msg_iov, &iovlen, IOV_MAX, |
| /* FIXME: Measure the effects of this */ |
| rktrans->rktrans_sndbuf_size); |
| msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen; |
| |
| #ifdef sun |
| /* See recvmsg() comment. Setting it here to be safe. */ |
| socket_errno = EAGAIN; |
| #endif |
| |
| r = sendmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT |
| #ifdef MSG_NOSIGNAL |
| | MSG_NOSIGNAL |
| #endif |
| ); |
| |
| if (r == -1) { |
| if (socket_errno == EAGAIN) |
| return 0; |
| rd_snprintf(errstr, errstr_size, "%s", rd_strerror(errno)); |
| } |
| |
| /* Update buffer read position */ |
| rd_slice_read(slice, NULL, (size_t)r); |
| |
| return r; |
| } |
| #endif |
| |
| |
| /** |
| * @brief Plain send() abstraction |
| */ |
| static ssize_t |
| rd_kafka_transport_socket_send0 (rd_kafka_transport_t *rktrans, |
| rd_slice_t *slice, |
| char *errstr, size_t errstr_size) { |
| ssize_t sum = 0; |
| const void *p; |
| size_t rlen; |
| |
| while ((rlen = rd_slice_peeker(slice, &p))) { |
| ssize_t r; |
| |
| r = send(rktrans->rktrans_s, p, |
| #ifdef _MSC_VER |
| (int)rlen, (int)0 |
| #else |
| rlen, 0 |
| #endif |
| ); |
| |
| #ifdef _MSC_VER |
| if (unlikely(r == SOCKET_ERROR)) { |
| if (sum > 0 || WSAGetLastError() == WSAEWOULDBLOCK) |
| return sum; |
| else { |
| rd_snprintf(errstr, errstr_size, "%s", |
| socket_strerror(WSAGetLastError())); |
| return -1; |
| } |
| } |
| #else |
| if (unlikely(r <= 0)) { |
| if (r == 0 || errno == EAGAIN) |
| return 0; |
| rd_snprintf(errstr, errstr_size, "%s", |
| socket_strerror(socket_errno)); |
| return -1; |
| } |
| #endif |
| |
| /* Update buffer read position */ |
| rd_slice_read(slice, NULL, (size_t)r); |
| |
| sum += r; |
| |
| /* FIXME: remove this and try again immediately and let |
| * the next write() call fail instead? */ |
| if ((size_t)r < rlen) |
| break; |
| } |
| |
| return sum; |
| } |
| |
| |
| static ssize_t |
| rd_kafka_transport_socket_send (rd_kafka_transport_t *rktrans, |
| rd_slice_t *slice, |
| char *errstr, size_t errstr_size) { |
| #ifndef _MSC_VER |
| /* FIXME: Use sendmsg() with iovecs if there's more than one segment |
| * remaining, otherwise (or if platform does not have sendmsg) |
| * use plain send(). */ |
| return rd_kafka_transport_socket_sendmsg(rktrans, slice, |
| errstr, errstr_size); |
| #endif |
| return rd_kafka_transport_socket_send0(rktrans, slice, |
| errstr, errstr_size); |
| } |
| |
| |
| |
| #ifndef _MSC_VER |
| /** |
| * @brief recvmsg() abstraction, converting a list of segments to iovecs. |
| * @remark should only be called if the number of segments is > 1. |
| */ |
| static ssize_t |
| rd_kafka_transport_socket_recvmsg (rd_kafka_transport_t *rktrans, |
| rd_buf_t *rbuf, |
| char *errstr, size_t errstr_size) { |
| ssize_t r; |
| struct iovec iov[IOV_MAX]; |
| struct msghdr msg = { .msg_iov = iov }; |
| size_t iovlen; |
| |
| rd_buf_get_write_iov(rbuf, msg.msg_iov, &iovlen, IOV_MAX, |
| /* FIXME: Measure the effects of this */ |
| rktrans->rktrans_rcvbuf_size); |
| msg.msg_iovlen = (typeof(msg.msg_iovlen))iovlen; |
| |
| #ifdef sun |
| /* SunOS doesn't seem to set errno when recvmsg() fails |
| * due to no data and MSG_DONTWAIT is set. */ |
| socket_errno = EAGAIN; |
| #endif |
| r = recvmsg(rktrans->rktrans_s, &msg, MSG_DONTWAIT); |
| if (unlikely(r <= 0)) { |
| if (r == -1 && socket_errno == EAGAIN) |
| return 0; |
| else if (r == 0) { |
| /* Receive 0 after POLLIN event means |
| * connection closed. */ |
| rd_snprintf(errstr, errstr_size, "Disconnected"); |
| return -1; |
| } else if (r == -1) { |
| rd_snprintf(errstr, errstr_size, "%s", |
| rd_strerror(errno)); |
| return -1; |
| } |
| } |
| |
| /* Update buffer write position */ |
| rd_buf_write(rbuf, NULL, (size_t)r); |
| |
| return r; |
| } |
| #endif |
| |
| |
| /** |
| * @brief Plain recv() |
| */ |
| static ssize_t |
| rd_kafka_transport_socket_recv0 (rd_kafka_transport_t *rktrans, |
| rd_buf_t *rbuf, |
| char *errstr, size_t errstr_size) { |
| ssize_t sum = 0; |
| void *p; |
| size_t len; |
| |
| while ((len = rd_buf_get_writable(rbuf, &p))) { |
| ssize_t r; |
| |
| r = recv(rktrans->rktrans_s, p, |
| #ifdef _MSC_VER |
| (int) |
| #endif |
| len, |
| 0); |
| |
| #ifdef _MSC_VER |
| if (unlikely(r == SOCKET_ERROR)) { |
| if (WSAGetLastError() == WSAEWOULDBLOCK) |
| return sum; |
| rd_snprintf(errstr, errstr_size, "%s", |
| socket_strerror(WSAGetLastError())); |
| return -1; |
| } |
| #else |
| if (unlikely(r <= 0)) { |
| if (r == -1 && socket_errno == EAGAIN) |
| return 0; |
| else if (r == 0) { |
| /* Receive 0 after POLLIN event means |
| * connection closed. */ |
| rd_snprintf(errstr, errstr_size, |
| "Disconnected"); |
| return -1; |
| } else if (r == -1) { |
| rd_snprintf(errstr, errstr_size, "%s", |
| rd_strerror(errno)); |
| return -1; |
| } |
| } |
| #endif |
| |
| /* Update buffer write position */ |
| rd_buf_write(rbuf, NULL, (size_t)r); |
| |
| sum += r; |
| |
| /* FIXME: remove this and try again immediately and let |
| * the next recv() call fail instead? */ |
| if ((size_t)r < len) |
| break; |
| } |
| return sum; |
| } |
| |
| |
| static ssize_t |
| rd_kafka_transport_socket_recv (rd_kafka_transport_t *rktrans, |
| rd_buf_t *buf, |
| char *errstr, size_t errstr_size) { |
| #ifndef _MSC_VER |
| /* FIXME: Use recvmsg() with iovecs if there's more than one segment |
| * remaining, otherwise (or if platform does not have sendmsg) |
| * use plain send(). */ |
| return rd_kafka_transport_socket_recvmsg(rktrans, buf, |
| errstr, errstr_size); |
| #endif |
| return rd_kafka_transport_socket_recv0(rktrans, buf, |
| errstr, errstr_size); |
| } |
| |
| |
| |
| |
| |
| /** |
| * CONNECT state is failed (errstr!=NULL) or done (TCP is up, SSL is working..). |
| * From this state we either hand control back to the broker code, |
| * or if authentication is configured we ente the AUTH state. |
| */ |
| void rd_kafka_transport_connect_done (rd_kafka_transport_t *rktrans, |
| char *errstr) { |
| rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
| |
| rd_kafka_broker_connect_done(rkb, errstr); |
| } |
| |
| |
| |
| #if WITH_SSL |
| |
| |
| /** |
| * Serves the entire OpenSSL error queue and logs each error. |
| * The last error is not logged but returned in 'errstr'. |
| * |
| * If 'rkb' is non-NULL broker-specific logging will be used, |
| * else it will fall back on global 'rk' debugging. |
| */ |
| static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb, |
| char *errstr, size_t errstr_size) { |
| unsigned long l; |
| const char *file, *data; |
| int line, flags; |
| int cnt = 0; |
| |
| while ((l = ERR_get_error_line_data(&file, &line, &data, &flags)) != 0) { |
| char buf[256]; |
| |
| if (cnt++ > 0) { |
| /* Log last message */ |
| if (rkb) |
| rd_rkb_log(rkb, LOG_ERR, "SSL", "%s", errstr); |
| else |
| rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr); |
| } |
| |
| ERR_error_string_n(l, buf, sizeof(buf)); |
| |
| rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s", |
| file, line, buf, (flags & ERR_TXT_STRING) ? data : ""); |
| |
| } |
| |
| if (cnt == 0) |
| rd_snprintf(errstr, errstr_size, "No error"); |
| |
| return errstr; |
| } |
| |
| |
| static void rd_kafka_transport_ssl_lock_cb (int mode, int i, |
| const char *file, int line) { |
| if (mode & CRYPTO_LOCK) |
| mtx_lock(&rd_kafka_ssl_locks[i]); |
| else |
| mtx_unlock(&rd_kafka_ssl_locks[i]); |
| } |
| |
| static unsigned long rd_kafka_transport_ssl_threadid_cb (void) { |
| #ifdef _MSC_VER |
| /* Windows makes a distinction between thread handle |
| * and thread id, which means we can't use the |
| * thrd_current() API that returns the handle. */ |
| return (unsigned long)GetCurrentThreadId(); |
| #else |
| return (unsigned long)(intptr_t)thrd_current(); |
| #endif |
| } |
| |
| |
| /** |
| * Global OpenSSL cleanup. |
| */ |
| void rd_kafka_transport_ssl_term (void) { |
| int i; |
| |
| CRYPTO_set_id_callback(NULL); |
| CRYPTO_set_locking_callback(NULL); |
| CRYPTO_cleanup_all_ex_data(); |
| |
| for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++) |
| mtx_destroy(&rd_kafka_ssl_locks[i]); |
| |
| rd_free(rd_kafka_ssl_locks); |
| |
| } |
| |
| |
| /** |
| * Global OpenSSL init. |
| */ |
| void rd_kafka_transport_ssl_init (void) { |
| int i; |
| |
| rd_kafka_ssl_locks_cnt = CRYPTO_num_locks(); |
| rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt * |
| sizeof(*rd_kafka_ssl_locks)); |
| for (i = 0 ; i < rd_kafka_ssl_locks_cnt ; i++) |
| mtx_init(&rd_kafka_ssl_locks[i], mtx_plain); |
| |
| CRYPTO_set_id_callback(rd_kafka_transport_ssl_threadid_cb); |
| CRYPTO_set_locking_callback(rd_kafka_transport_ssl_lock_cb); |
| |
| SSL_load_error_strings(); |
| SSL_library_init(); |
| OpenSSL_add_all_algorithms(); |
| } |
| |
| |
| /** |
| * Set transport IO event polling based on SSL error. |
| * |
| * Returns -1 on permanent errors. |
| * |
| * Locality: broker thread |
| */ |
| static RD_INLINE int |
| rd_kafka_transport_ssl_io_update (rd_kafka_transport_t *rktrans, int ret, |
| char *errstr, size_t errstr_size) { |
| int serr = SSL_get_error(rktrans->rktrans_ssl, ret); |
| int serr2; |
| |
| switch (serr) |
| { |
| case SSL_ERROR_WANT_READ: |
| rd_kafka_transport_poll_set(rktrans, POLLIN); |
| break; |
| |
| case SSL_ERROR_WANT_WRITE: |
| case SSL_ERROR_WANT_CONNECT: |
| rd_kafka_transport_poll_set(rktrans, POLLOUT); |
| break; |
| |
| case SSL_ERROR_SYSCALL: |
| if (!(serr2 = SSL_get_error(rktrans->rktrans_ssl, ret))) { |
| if (ret == 0) |
| errno = ECONNRESET; |
| rd_snprintf(errstr, errstr_size, |
| "SSL syscall error: %s", rd_strerror(errno)); |
| } else |
| rd_snprintf(errstr, errstr_size, |
| "SSL syscall error number: %d: %s", serr2, |
| rd_strerror(errno)); |
| return -1; |
| |
| case SSL_ERROR_ZERO_RETURN: |
| rd_snprintf(errstr, errstr_size, "Disconnected"); |
| return -1; |
| |
| default: |
| rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, |
| errstr, errstr_size); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| static ssize_t |
| rd_kafka_transport_ssl_send (rd_kafka_transport_t *rktrans, |
| rd_slice_t *slice, |
| char *errstr, size_t errstr_size) { |
| ssize_t sum = 0; |
| const void *p; |
| size_t rlen; |
| |
| while ((rlen = rd_slice_peeker(slice, &p))) { |
| int r; |
| |
| r = SSL_write(rktrans->rktrans_ssl, p, (int)rlen); |
| |
| if (unlikely(r <= 0)) { |
| if (rd_kafka_transport_ssl_io_update(rktrans, r, |
| errstr, |
| errstr_size) == -1) |
| return -1; |
| else |
| return sum; |
| } |
| |
| /* Update buffer read position */ |
| rd_slice_read(slice, NULL, (size_t)r); |
| |
| sum += r; |
| /* FIXME: remove this and try again immediately and let |
| * the next SSL_write() call fail instead? */ |
| if ((size_t)r < rlen) |
| break; |
| |
| } |
| return sum; |
| } |
| |
| static ssize_t |
| rd_kafka_transport_ssl_recv (rd_kafka_transport_t *rktrans, |
| rd_buf_t *rbuf, char *errstr, size_t errstr_size) { |
| ssize_t sum = 0; |
| void *p; |
| size_t len; |
| |
| while ((len = rd_buf_get_writable(rbuf, &p))) { |
| int r; |
| |
| r = SSL_read(rktrans->rktrans_ssl, p, (int)len); |
| |
| if (unlikely(r <= 0)) { |
| if (rd_kafka_transport_ssl_io_update(rktrans, r, |
| errstr, |
| errstr_size) == -1) |
| return -1; |
| else |
| return sum; |
| } |
| |
| VALGRIND_MAKE_MEM_DEFINED(p, r); |
| |
| /* Update buffer write position */ |
| rd_buf_write(rbuf, NULL, (size_t)r); |
| |
| sum += r; |
| |
| /* FIXME: remove this and try again immediately and let |
| * the next SSL_read() call fail instead? */ |
| if ((size_t)r < len) |
| break; |
| |
| } |
| return sum; |
| |
| } |
| |
| |
| /** |
| * OpenSSL password query callback |
| * |
| * Locality: application thread |
| */ |
| static int rd_kafka_transport_ssl_passwd_cb (char *buf, int size, int rwflag, |
| void *userdata) { |
| rd_kafka_t *rk = userdata; |
| int pwlen; |
| |
| rd_kafka_dbg(rk, SECURITY, "SSLPASSWD", |
| "Private key file \"%s\" requires password", |
| rk->rk_conf.ssl.key_location); |
| |
| if (!rk->rk_conf.ssl.key_password) { |
| rd_kafka_log(rk, LOG_WARNING, "SSLPASSWD", |
| "Private key file \"%s\" requires password but " |
| "no password configured (ssl.key.password)", |
| rk->rk_conf.ssl.key_location); |
| return -1; |
| } |
| |
| |
| pwlen = (int) strlen(rk->rk_conf.ssl.key_password); |
| memcpy(buf, rk->rk_conf.ssl.key_password, RD_MIN(pwlen, size)); |
| |
| return pwlen; |
| } |
| |
| /** |
| * Set up SSL for a newly connected connection |
| * |
| * Returns -1 on failure, else 0. |
| */ |
| static int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb, |
| rd_kafka_transport_t *rktrans, |
| char *errstr, size_t errstr_size) { |
| int r; |
| char name[RD_KAFKA_NODENAME_SIZE]; |
| char *t; |
| |
| rktrans->rktrans_ssl = SSL_new(rkb->rkb_rk->rk_conf.ssl.ctx); |
| if (!rktrans->rktrans_ssl) |
| goto fail; |
| |
| if (!SSL_set_fd(rktrans->rktrans_ssl, rktrans->rktrans_s)) |
| goto fail; |
| |
| #if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT) |
| /* If non-numerical hostname, send it for SNI */ |
| rd_snprintf(name, sizeof(name), "%s", rkb->rkb_nodename); |
| if ((t = strrchr(name, ':'))) |
| *t = '\0'; |
| if (!(/*ipv6*/(strchr(name, ':') && |
| strspn(name, "0123456789abcdefABCDEF:.[]%") == strlen(name)) || |
| /*ipv4*/strspn(name, "0123456789.") == strlen(name)) && |
| !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name)) |
| goto fail; |
| #endif |
| |
| r = SSL_connect(rktrans->rktrans_ssl); |
| if (r == 1) { |
| /* Connected, highly unlikely since this is a |
| * non-blocking operation. */ |
| rd_kafka_transport_connect_done(rktrans, NULL); |
| return 0; |
| } |
| |
| |
| if (rd_kafka_transport_ssl_io_update(rktrans, r, |
| errstr, errstr_size) == -1) |
| return -1; |
| |
| return 0; |
| |
| fail: |
| rd_kafka_ssl_error(NULL, rkb, errstr, errstr_size); |
| return -1; |
| } |
| |
| |
| static RD_UNUSED int |
| rd_kafka_transport_ssl_io_event (rd_kafka_transport_t *rktrans, int events) { |
| int r; |
| char errstr[512]; |
| |
| if (events & POLLOUT) { |
| r = SSL_write(rktrans->rktrans_ssl, NULL, 0); |
| if (rd_kafka_transport_ssl_io_update(rktrans, r, |
| errstr, |
| sizeof(errstr)) == -1) |
| goto fail; |
| } |
| |
| return 0; |
| |
| fail: |
| /* Permanent error */ |
| rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__TRANSPORT, |
| "%s", errstr); |
| return -1; |
| } |
| |
| |
| /** |
| * Verify SSL handshake was valid. |
| */ |
| static int rd_kafka_transport_ssl_verify (rd_kafka_transport_t *rktrans) { |
| long int rl; |
| X509 *cert; |
| |
| cert = SSL_get_peer_certificate(rktrans->rktrans_ssl); |
| X509_free(cert); |
| if (!cert) { |
| rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__SSL, |
| "Broker did not provide a certificate"); |
| return -1; |
| } |
| |
| if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) { |
| rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__SSL, |
| "Failed to verify broker certificate: %s", |
| X509_verify_cert_error_string(rl)); |
| return -1; |
| } |
| |
| rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", |
| "Broker SSL certificate verified"); |
| return 0; |
| } |
| |
| /** |
| * SSL handshake handling. |
| * Call repeatedly (based on IO events) until handshake is done. |
| * |
| * Returns -1 on error, 0 if handshake is still in progress, or 1 on completion. |
| */ |
| static int rd_kafka_transport_ssl_handshake (rd_kafka_transport_t *rktrans) { |
| rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
| char errstr[512]; |
| int r; |
| |
| r = SSL_do_handshake(rktrans->rktrans_ssl); |
| if (r == 1) { |
| /* SSL handshake done. Verify. */ |
| if (rd_kafka_transport_ssl_verify(rktrans) == -1) |
| return -1; |
| |
| rd_kafka_transport_connect_done(rktrans, NULL); |
| return 1; |
| |
| } else if (rd_kafka_transport_ssl_io_update(rktrans, r, |
| errstr, |
| sizeof(errstr)) == -1) { |
| rd_kafka_broker_fail(rkb, LOG_ERR, RD_KAFKA_RESP_ERR__SSL, |
| "SSL handshake failed: %s%s", errstr, |
| strstr(errstr, "unexpected message") ? |
| ": client authentication might be " |
| "required (see broker log)" : ""); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| |
| /** |
| * Once per rd_kafka_t handle cleanup of OpenSSL |
| * |
| * Locality: any thread |
| * |
| * NOTE: rd_kafka_wrlock() MUST be held |
| */ |
| void rd_kafka_transport_ssl_ctx_term (rd_kafka_t *rk) { |
| SSL_CTX_free(rk->rk_conf.ssl.ctx); |
| rk->rk_conf.ssl.ctx = NULL; |
| } |
| |
| /** |
| * Once per rd_kafka_t handle initialization of OpenSSL |
| * |
| * Locality: application thread |
| * |
| * NOTE: rd_kafka_wrlock() MUST be held |
| */ |
| int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk, |
| char *errstr, size_t errstr_size) { |
| int r; |
| SSL_CTX *ctx; |
| |
| if (errstr_size > 0) |
| errstr[0] = '\0'; |
| |
| ctx = SSL_CTX_new(SSLv23_client_method()); |
| if (!ctx) { |
| rd_snprintf(errstr, errstr_size, |
| "SSLv23_client_method() failed: "); |
| goto fail; |
| } |
| |
| #ifdef SSL_OP_NO_SSLv3 |
| /* Disable SSLv3 (unsafe) */ |
| SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3); |
| #endif |
| |
| /* Key file password callback */ |
| SSL_CTX_set_default_passwd_cb(ctx, rd_kafka_transport_ssl_passwd_cb); |
| SSL_CTX_set_default_passwd_cb_userdata(ctx, rk); |
| |
| /* Ciphers */ |
| if (rk->rk_conf.ssl.cipher_suites) { |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "Setting cipher list: %s", |
| rk->rk_conf.ssl.cipher_suites); |
| if (!SSL_CTX_set_cipher_list(ctx, |
| rk->rk_conf.ssl.cipher_suites)) { |
| /* Set a string that will prefix the |
| * the OpenSSL error message (which is lousy) |
| * to make it more meaningful. */ |
| rd_snprintf(errstr, errstr_size, |
| "ssl.cipher.suites failed: "); |
| goto fail; |
| } |
| } |
| |
| |
| if (rk->rk_conf.ssl.ca_location) { |
| /* CA certificate location, either file or directory. */ |
| int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location); |
| |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "Loading CA certificate(s) from %s %s", |
| is_dir ? "directory":"file", |
| rk->rk_conf.ssl.ca_location); |
| |
| r = SSL_CTX_load_verify_locations(ctx, |
| !is_dir ? |
| rk->rk_conf.ssl. |
| ca_location : NULL, |
| is_dir ? |
| rk->rk_conf.ssl. |
| ca_location : NULL); |
| |
| if (r != 1) { |
| rd_snprintf(errstr, errstr_size, |
| "ssl.ca.location failed: "); |
| goto fail; |
| } |
| } else { |
| /* Use default CA certificate paths: ignore failures. */ |
| r = SSL_CTX_set_default_verify_paths(ctx); |
| if (r != 1) |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "SSL_CTX_set_default_verify_paths() " |
| "failed: ignoring"); |
| } |
| |
| if (rk->rk_conf.ssl.crl_location) { |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "Loading CRL from file %s", |
| rk->rk_conf.ssl.crl_location); |
| |
| r = SSL_CTX_load_verify_locations(ctx, |
| rk->rk_conf.ssl.crl_location, |
| NULL); |
| |
| if (r != 1) { |
| rd_snprintf(errstr, errstr_size, |
| "ssl.crl.location failed: "); |
| goto fail; |
| } |
| |
| |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "Enabling CRL checks"); |
| |
| X509_STORE_set_flags(SSL_CTX_get_cert_store(ctx), |
| X509_V_FLAG_CRL_CHECK); |
| } |
| |
| if (rk->rk_conf.ssl.cert_location) { |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "Loading certificate from file %s", |
| rk->rk_conf.ssl.cert_location); |
| |
| r = SSL_CTX_use_certificate_chain_file(ctx, |
| rk->rk_conf.ssl.cert_location); |
| |
| if (r != 1) { |
| rd_snprintf(errstr, errstr_size, |
| "ssl.certificate.location failed: "); |
| goto fail; |
| } |
| } |
| |
| if (rk->rk_conf.ssl.key_location) { |
| rd_kafka_dbg(rk, SECURITY, "SSL", |
| "Loading private key file from %s", |
| rk->rk_conf.ssl.key_location); |
| |
| r = SSL_CTX_use_PrivateKey_file(ctx, |
| rk->rk_conf.ssl.key_location, |
| SSL_FILETYPE_PEM); |
| if (r != 1) { |
| rd_snprintf(errstr, errstr_size, |
| "ssl.key.location failed: "); |
| goto fail; |
| } |
| } |
| |
| |
| SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); |
| |
| rk->rk_conf.ssl.ctx = ctx; |
| return 0; |
| |
| fail: |
| r = (int)strlen(errstr); |
| rd_kafka_ssl_error(rk, NULL, errstr+r, |
| (int)errstr_size > r ? (int)errstr_size - r : 0); |
| SSL_CTX_free(ctx); |
| |
| return -1; |
| } |
| |
| |
| #endif /* WITH_SSL */ |
| |
| |
| ssize_t |
| rd_kafka_transport_send (rd_kafka_transport_t *rktrans, |
| rd_slice_t *slice, char *errstr, size_t errstr_size) { |
| |
| #if WITH_SSL |
| if (rktrans->rktrans_ssl) |
| return rd_kafka_transport_ssl_send(rktrans, slice, |
| errstr, errstr_size); |
| else |
| #endif |
| return rd_kafka_transport_socket_send(rktrans, slice, |
| errstr, errstr_size); |
| } |
| |
| |
| ssize_t |
| rd_kafka_transport_recv (rd_kafka_transport_t *rktrans, rd_buf_t *rbuf, |
| char *errstr, size_t errstr_size) { |
| #if WITH_SSL |
| if (rktrans->rktrans_ssl) |
| return rd_kafka_transport_ssl_recv(rktrans, rbuf, |
| errstr, errstr_size); |
| else |
| #endif |
| return rd_kafka_transport_socket_recv(rktrans, rbuf, |
| errstr, errstr_size); |
| } |
| |
| |
| |
| |
| /** |
| * Length framed receive handling. |
| * Currently only supports a the following framing: |
| * [int32_t:big_endian_length_of_payload][payload] |
| * |
| * To be used on POLLIN event, will return: |
| * -1: on fatal error (errstr will be updated, *rkbufp remains unset) |
| * 0: still waiting for data (*rkbufp remains unset) |
| * 1: data complete, (buffer returned in *rkbufp) |
| */ |
| int rd_kafka_transport_framed_recv (rd_kafka_transport_t *rktrans, |
| rd_kafka_buf_t **rkbufp, |
| char *errstr, size_t errstr_size) { |
| rd_kafka_buf_t *rkbuf = rktrans->rktrans_recv_buf; |
| ssize_t r; |
| const int log_decode_errors = LOG_ERR; |
| |
| /* States: |
| * !rktrans_recv_buf: initial state; set up buf to receive header. |
| * rkbuf_totlen == 0: awaiting header |
| * rkbuf_totlen > 0: awaiting payload |
| */ |
| |
| if (!rkbuf) { |
| rkbuf = rd_kafka_buf_new(1, 4/*length field's length*/); |
| /* Set up buffer reader for the length field */ |
| rd_buf_write_ensure(&rkbuf->rkbuf_buf, 4, 4); |
| rktrans->rktrans_recv_buf = rkbuf; |
| } |
| |
| |
| r = rd_kafka_transport_recv(rktrans, &rkbuf->rkbuf_buf, |
| errstr, errstr_size); |
| if (r == 0) |
| return 0; |
| else if (r == -1) |
| return -1; |
| |
| if (rkbuf->rkbuf_totlen == 0) { |
| /* Frame length not known yet. */ |
| int32_t frame_len; |
| |
| if (rd_buf_write_pos(&rkbuf->rkbuf_buf) < sizeof(frame_len)) { |
| /* Wait for entire frame header. */ |
| return 0; |
| } |
| |
| /* Initialize reader */ |
| rd_slice_init(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf, 0, 4); |
| |
| /* Reader header: payload length */ |
| rd_kafka_buf_read_i32(rkbuf, &frame_len); |
| |
| if (frame_len < 0 || |
| frame_len > rktrans->rktrans_rkb-> |
| rkb_rk->rk_conf.recv_max_msg_size) { |
| rd_snprintf(errstr, errstr_size, |
| "Invalid frame size %"PRId32, frame_len); |
| return -1; |
| } |
| |
| rkbuf->rkbuf_totlen = 4 + frame_len; |
| if (frame_len == 0) { |
| /* Payload is empty, we're done. */ |
| rktrans->rktrans_recv_buf = NULL; |
| *rkbufp = rkbuf; |
| return 1; |
| } |
| |
| /* Allocate memory to hold entire frame payload in contigious |
| * memory. */ |
| rd_buf_write_ensure_contig(&rkbuf->rkbuf_buf, frame_len); |
| |
| /* Try reading directly, there is probably more data available*/ |
| return rd_kafka_transport_framed_recv(rktrans, rkbufp, |
| errstr, errstr_size); |
| } |
| |
| if (rd_buf_write_pos(&rkbuf->rkbuf_buf) == rkbuf->rkbuf_totlen) { |
| /* Payload is complete. */ |
| rktrans->rktrans_recv_buf = NULL; |
| *rkbufp = rkbuf; |
| return 1; |
| } |
| |
| /* Wait for more data */ |
| return 0; |
| |
| err_parse: |
| if (rkbuf) |
| rd_kafka_buf_destroy(rkbuf); |
| rd_snprintf(errstr, errstr_size, "Frame header parsing failed: %s", |
| rd_kafka_err2str(rkbuf->rkbuf_err)); |
| return -1; |
| } |
| |
| |
| /** |
| * TCP connection established. |
| * Set up socket options, SSL, etc. |
| * |
| * Locality: broker thread |
| */ |
| static void rd_kafka_transport_connected (rd_kafka_transport_t *rktrans) { |
| rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
| unsigned int slen; |
| |
| rd_rkb_dbg(rkb, BROKER, "CONNECT", |
| "Connected to %s", |
| rd_sockaddr2str(rkb->rkb_addr_last, |
| RD_SOCKADDR2STR_F_PORT | |
| RD_SOCKADDR2STR_F_FAMILY)); |
| |
| /* Set socket send & receive buffer sizes if configuerd */ |
| if (rkb->rkb_rk->rk_conf.socket_sndbuf_size != 0) { |
| if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF, |
| (void *)&rkb->rkb_rk->rk_conf.socket_sndbuf_size, |
| sizeof(rkb->rkb_rk->rk_conf. |
| socket_sndbuf_size)) == SOCKET_ERROR) |
| rd_rkb_log(rkb, LOG_WARNING, "SNDBUF", |
| "Failed to set socket send " |
| "buffer size to %i: %s", |
| rkb->rkb_rk->rk_conf.socket_sndbuf_size, |
| socket_strerror(socket_errno)); |
| } |
| |
| if (rkb->rkb_rk->rk_conf.socket_rcvbuf_size != 0) { |
| if (setsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF, |
| (void *)&rkb->rkb_rk->rk_conf.socket_rcvbuf_size, |
| sizeof(rkb->rkb_rk->rk_conf. |
| socket_rcvbuf_size)) == SOCKET_ERROR) |
| rd_rkb_log(rkb, LOG_WARNING, "RCVBUF", |
| "Failed to set socket receive " |
| "buffer size to %i: %s", |
| rkb->rkb_rk->rk_conf.socket_rcvbuf_size, |
| socket_strerror(socket_errno)); |
| } |
| |
| /* Get send and receive buffer sizes to allow limiting |
| * the total number of bytes passed with iovecs to sendmsg() |
| * and recvmsg(). */ |
| slen = sizeof(rktrans->rktrans_rcvbuf_size); |
| if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_RCVBUF, |
| (void *)&rktrans->rktrans_rcvbuf_size, |
| &slen) == SOCKET_ERROR) { |
| rd_rkb_log(rkb, LOG_WARNING, "RCVBUF", |
| "Failed to get socket receive " |
| "buffer size: %s: assuming 1MB", |
| socket_strerror(socket_errno)); |
| rktrans->rktrans_rcvbuf_size = 1024*1024; |
| } else if (rktrans->rktrans_rcvbuf_size < 1024 * 64) |
| rktrans->rktrans_rcvbuf_size = 1024*64; /* Use at least 64KB */ |
| |
| slen = sizeof(rktrans->rktrans_sndbuf_size); |
| if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, SO_SNDBUF, |
| (void *)&rktrans->rktrans_sndbuf_size, |
| &slen) == SOCKET_ERROR) { |
| rd_rkb_log(rkb, LOG_WARNING, "RCVBUF", |
| "Failed to get socket send " |
| "buffer size: %s: assuming 1MB", |
| socket_strerror(socket_errno)); |
| rktrans->rktrans_sndbuf_size = 1024*1024; |
| } else if (rktrans->rktrans_sndbuf_size < 1024 * 64) |
| rktrans->rktrans_sndbuf_size = 1024*64; /* Use at least 64KB */ |
| |
| |
| #ifdef TCP_NODELAY |
| if (rkb->rkb_rk->rk_conf.socket_nagle_disable) { |
| int one = 1; |
| if (setsockopt(rktrans->rktrans_s, IPPROTO_TCP, TCP_NODELAY, |
| (void *)&one, sizeof(one)) == SOCKET_ERROR) |
| rd_rkb_log(rkb, LOG_WARNING, "NAGLE", |
| "Failed to disable Nagle (TCP_NODELAY) " |
| "on socket %d: %s", |
| socket_strerror(socket_errno)); |
| } |
| #endif |
| |
| |
| #if WITH_SSL |
| if (rkb->rkb_proto == RD_KAFKA_PROTO_SSL || |
| rkb->rkb_proto == RD_KAFKA_PROTO_SASL_SSL) { |
| char errstr[512]; |
| |
| /* Set up SSL connection. |
| * This is also an asynchronous operation so dont |
| * propagate to broker_connect_done() just yet. */ |
| if (rd_kafka_transport_ssl_connect(rkb, rktrans, |
| errstr, |
| sizeof(errstr)) == -1) { |
| rd_kafka_transport_connect_done(rktrans, errstr); |
| return; |
| } |
| return; |
| } |
| #endif |
| |
| /* Propagate connect success */ |
| rd_kafka_transport_connect_done(rktrans, NULL); |
| } |
| |
| |
| |
| /** |
| * @brief the kernel SO_ERROR in \p errp for the given transport. |
| * @returns 0 if getsockopt() was succesful (and \p and errp can be trusted), |
| * else -1 in which case \p errp 's value is undefined. |
| */ |
| static int rd_kafka_transport_get_socket_error (rd_kafka_transport_t *rktrans, |
| int *errp) { |
| socklen_t intlen = sizeof(*errp); |
| |
| if (getsockopt(rktrans->rktrans_s, SOL_SOCKET, |
| SO_ERROR, (void *)errp, &intlen) == -1) { |
| rd_rkb_dbg(rktrans->rktrans_rkb, BROKER, "SO_ERROR", |
| "Failed to get socket error: %s", |
| socket_strerror(socket_errno)); |
| return -1; |
| } |
| |
| return 0; |
| } |
| |
| |
| /** |
| * IO event handler. |
| * |
| * Locality: broker thread |
| */ |
| static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, |
| int events) { |
| char errstr[512]; |
| int r; |
| rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
| |
| switch (rkb->rkb_state) |
| { |
| case RD_KAFKA_BROKER_STATE_CONNECT: |
| #if WITH_SSL |
| if (rktrans->rktrans_ssl) { |
| /* Currently setting up SSL connection: |
| * perform handshake. */ |
| rd_kafka_transport_ssl_handshake(rktrans); |
| return; |
| } |
| #endif |
| |
| /* Asynchronous connect finished, read status. */ |
| if (!(events & (POLLOUT|POLLERR|POLLHUP))) |
| return; |
| |
| if (rd_kafka_transport_get_socket_error(rktrans, &r) == -1) { |
| rd_kafka_broker_fail( |
| rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT, |
| "Connect to %s failed: " |
| "unable to get status from " |
| "socket %d: %s", |
| rd_sockaddr2str(rkb->rkb_addr_last, |
| RD_SOCKADDR2STR_F_PORT | |
| RD_SOCKADDR2STR_F_FAMILY), |
| rktrans->rktrans_s, |
| rd_strerror(socket_errno)); |
| } else if (r != 0) { |
| /* Connect failed */ |
| errno = r; |
| rd_snprintf(errstr, sizeof(errstr), |
| "Connect to %s failed: %s", |
| rd_sockaddr2str(rkb->rkb_addr_last, |
| RD_SOCKADDR2STR_F_PORT | |
| RD_SOCKADDR2STR_F_FAMILY), |
| rd_strerror(r)); |
| |
| rd_kafka_transport_connect_done(rktrans, errstr); |
| } else { |
| /* Connect succeeded */ |
| rd_kafka_transport_connected(rktrans); |
| } |
| break; |
| |
| case RD_KAFKA_BROKER_STATE_AUTH: |
| /* SASL handshake */ |
| if (rd_kafka_sasl_io_event(rktrans, events, |
| errstr, sizeof(errstr)) == -1) { |
| errno = EINVAL; |
| rd_kafka_broker_fail(rkb, LOG_ERR, |
| RD_KAFKA_RESP_ERR__AUTHENTICATION, |
| "SASL authentication failure: %s", |
| errstr); |
| return; |
| } |
| break; |
| |
| case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY: |
| case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE: |
| case RD_KAFKA_BROKER_STATE_UP: |
| case RD_KAFKA_BROKER_STATE_UPDATE: |
| |
| if (events & POLLIN) { |
| while (rkb->rkb_state >= RD_KAFKA_BROKER_STATE_UP && |
| rd_kafka_recv(rkb) > 0) |
| ; |
| } |
| |
| if (events & POLLHUP) { |
| rd_kafka_broker_fail(rkb, |
| rkb->rkb_rk->rk_conf. |
| log_connection_close ? |
| LOG_NOTICE : LOG_DEBUG, |
| RD_KAFKA_RESP_ERR__TRANSPORT, |
| "Connection closed"); |
| return; |
| } |
| |
| if (events & POLLOUT) { |
| while (rd_kafka_send(rkb) > 0) |
| ; |
| } |
| break; |
| |
| case RD_KAFKA_BROKER_STATE_INIT: |
| case RD_KAFKA_BROKER_STATE_DOWN: |
| rd_kafka_assert(rkb->rkb_rk, !*"bad state"); |
| } |
| } |
| |
| |
| /** |
| * Poll and serve IOs |
| * |
| * Locality: broker thread |
| */ |
| void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, |
| int timeout_ms) { |
| rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; |
| int events; |
| |
| if (rd_kafka_bufq_cnt(&rkb->rkb_waitresps) < rkb->rkb_max_inflight && |
| rd_kafka_bufq_cnt(&rkb->rkb_outbufs) > 0) |
| rd_kafka_transport_poll_set(rkb->rkb_transport, POLLOUT); |
| |
| if ((events = rd_kafka_transport_poll(rktrans, timeout_ms)) <= 0) |
| return; |
| |
| rd_kafka_transport_poll_clear(rktrans, POLLOUT); |
| |
| rd_kafka_transport_io_event(rktrans, events); |
| } |
| |
| |
| /** |
| * Initiate asynchronous connection attempt. |
| * |
| * Locality: broker thread |
| */ |
| rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb, |
| const rd_sockaddr_inx_t *sinx, |
| char *errstr, |
| size_t errstr_size) { |
| rd_kafka_transport_t *rktrans; |
| int s = -1; |
| int on = 1; |
| int r; |
| |
| rkb->rkb_addr_last = sinx; |
| |
| s = rkb->rkb_rk->rk_conf.socket_cb(sinx->in.sin_family, |
| SOCK_STREAM, IPPROTO_TCP, |
| rkb->rkb_rk->rk_conf.opaque); |
| if (s == -1) { |
| rd_snprintf(errstr, errstr_size, "Failed to create socket: %s", |
| socket_strerror(socket_errno)); |
| return NULL; |
| } |
| |
| |
| #ifdef SO_NOSIGPIPE |
| /* Disable SIGPIPE signalling for this socket on OSX */ |
| if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) |
| rd_rkb_dbg(rkb, BROKER, "SOCKET", |
| "Failed to set SO_NOSIGPIPE: %s", |
| socket_strerror(socket_errno)); |
| #endif |
| |
| /* Enable TCP keep-alives, if configured. */ |
| if (rkb->rkb_rk->rk_conf.socket_keepalive) { |
| #ifdef SO_KEEPALIVE |
| if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, |
| (void *)&on, sizeof(on)) == SOCKET_ERROR) |
| rd_rkb_dbg(rkb, BROKER, "SOCKET", |
| "Failed to set SO_KEEPALIVE: %s", |
| socket_strerror(socket_errno)); |
| #else |
| rd_rkb_dbg(rkb, BROKER, "SOCKET", |
| "System does not support " |
| "socket.keepalive.enable (SO_KEEPALIVE)"); |
| #endif |
| } |
| |
| /* Set the socket to non-blocking */ |
| if ((r = rd_fd_set_nonblocking(s))) { |
| rd_snprintf(errstr, errstr_size, |
| "Failed to set socket non-blocking: %s", |
| socket_strerror(r)); |
| goto err; |
| } |
| |
| rd_rkb_dbg(rkb, BROKER, "CONNECT", "Connecting to %s (%s) " |
| "with socket %i", |
| rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_FAMILY | |
| RD_SOCKADDR2STR_F_PORT), |
| rd_kafka_secproto_names[rkb->rkb_proto], s); |
| |
| /* Connect to broker */ |
| if (rkb->rkb_rk->rk_conf.connect_cb) { |
| r = rkb->rkb_rk->rk_conf.connect_cb( |
| s, (struct sockaddr *)sinx, RD_SOCKADDR_INX_LEN(sinx), |
| rkb->rkb_name, rkb->rkb_rk->rk_conf.opaque); |
| } else { |
| if (connect(s, (struct sockaddr *)sinx, |
| RD_SOCKADDR_INX_LEN(sinx)) == SOCKET_ERROR && |
| (socket_errno != EINPROGRESS |
| #ifdef _MSC_VER |
| && socket_errno != WSAEWOULDBLOCK |
| #endif |
| )) |
| r = socket_errno; |
| else |
| r = 0; |
| } |
| |
| if (r != 0) { |
| rd_rkb_dbg(rkb, BROKER, "CONNECT", |
| "couldn't connect to %s: %s (%i)", |
| rd_sockaddr2str(sinx, |
| RD_SOCKADDR2STR_F_PORT | |
| RD_SOCKADDR2STR_F_FAMILY), |
| socket_strerror(r), r); |
| rd_snprintf(errstr, errstr_size, |
| "Failed to connect to broker at %s: %s", |
| rd_sockaddr2str(sinx, RD_SOCKADDR2STR_F_NICE), |
| socket_strerror(r)); |
| goto err; |
| } |
| |
| /* Create transport handle */ |
| rktrans = rd_calloc(1, sizeof(*rktrans)); |
| rktrans->rktrans_rkb = rkb; |
| rktrans->rktrans_s = s; |
| rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = s; |
| if (rkb->rkb_wakeup_fd[0] != -1) { |
| rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt].events = POLLIN; |
| rktrans->rktrans_pfd[rktrans->rktrans_pfd_cnt++].fd = rkb->rkb_wakeup_fd[0]; |
| } |
| |
| |
| /* Poll writability to trigger on connection success/failure. */ |
| rd_kafka_transport_poll_set(rktrans, POLLOUT); |
| |
| return rktrans; |
| |
| err: |
| if (s != -1) |
| rd_kafka_transport_close0(rkb->rkb_rk, s); |
| |
| return NULL; |
| } |
| |
| |
| |
| void rd_kafka_transport_poll_set(rd_kafka_transport_t *rktrans, int event) { |
| rktrans->rktrans_pfd[0].events |= event; |
| } |
| |
| void rd_kafka_transport_poll_clear(rd_kafka_transport_t *rktrans, int event) { |
| rktrans->rktrans_pfd[0].events &= ~event; |
| } |
| |
| |
| int rd_kafka_transport_poll(rd_kafka_transport_t *rktrans, int tmout) { |
| int r; |
| #ifndef _MSC_VER |
| r = poll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout); |
| if (r <= 0) |
| return r; |
| #else |
| r = WSAPoll(rktrans->rktrans_pfd, rktrans->rktrans_pfd_cnt, tmout); |
| if (r == 0) { |
| /* Workaround for broken WSAPoll() while connecting: |
| * failed connection attempts are not indicated at all by WSAPoll() |
| * so we need to check the socket error when Poll returns 0. |
| * Issue #525 */ |
| r = ECONNRESET; |
| if (unlikely(rktrans->rktrans_rkb->rkb_state == |
| RD_KAFKA_BROKER_STATE_CONNECT && |
| (rd_kafka_transport_get_socket_error(rktrans, |
| &r) == -1 || |
| r != 0))) { |
| char errstr[512]; |
| errno = r; |
| rd_snprintf(errstr, sizeof(errstr), |
| "Connect to %s failed: %s", |
| rd_sockaddr2str(rktrans->rktrans_rkb-> |
| rkb_addr_last, |
| RD_SOCKADDR2STR_F_PORT | |
| RD_SOCKADDR2STR_F_FAMILY), |
| socket_strerror(r)); |
| rd_kafka_transport_connect_done(rktrans, errstr); |
| return -1; |
| } else |
| return 0; |
| } else if (r == SOCKET_ERROR) |
| return -1; |
| #endif |
| rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1); |
| |
| if (rktrans->rktrans_pfd[1].revents & POLLIN) { |
| /* Read wake-up fd data and throw away, just used for wake-ups*/ |
| char buf[512]; |
| if (rd_read((int)rktrans->rktrans_pfd[1].fd, |
| buf, sizeof(buf)) == -1) { |
| /* Ignore warning */ |
| } |
| } |
| |
| return rktrans->rktrans_pfd[0].revents; |
| } |
| |
| |
| |
| |
| |
| #if 0 |
| /** |
| * Global cleanup. |
| * This is dangerous and SHOULD NOT be called since it will rip |
| * the rug from under the application if it uses any of this functionality |
| * in its own code. This means we might leak some memory on exit. |
| */ |
| void rd_kafka_transport_term (void) { |
| #ifdef _MSC_VER |
| (void)WSACleanup(); /* FIXME: dangerous */ |
| #endif |
| } |
| #endif |
| |
| void rd_kafka_transport_init(void) { |
| #ifdef _MSC_VER |
| WSADATA d; |
| (void)WSAStartup(MAKEWORD(2, 2), &d); |
| #endif |
| } |