| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012-2015, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| #pragma once |
| |
| #include "rdkafka_int.h" |
| #include "rdcrc32.h" |
| #include "rdlist.h" |
| #include "rdbuf.h" |
| |
| |
| typedef struct rd_kafka_broker_s rd_kafka_broker_t; |
| |
| #define RD_KAFKA_HEADERS_IOV_CNT 2 |
| |
| |
| /** |
| * Temporary buffer with memory aligned writes to accommodate |
| * effective and platform safe struct writes. |
| */ |
| typedef struct rd_tmpabuf_s { |
| size_t size; |
| size_t of; |
| char *buf; |
| int failed; |
| int assert_on_fail; |
| } rd_tmpabuf_t; |
| |
| /** |
| * @brief Allocate new tmpabuf with \p size bytes pre-allocated. |
| */ |
| static RD_UNUSED void |
| rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) { |
| tab->buf = rd_malloc(size); |
| tab->size = size; |
| tab->of = 0; |
| tab->failed = 0; |
| tab->assert_on_fail = assert_on_fail; |
| } |
| |
| /** |
| * @brief Free memory allocated by tmpabuf |
| */ |
| static RD_UNUSED void |
| rd_tmpabuf_destroy (rd_tmpabuf_t *tab) { |
| rd_free(tab->buf); |
| } |
| |
| /** |
| * @returns 1 if a previous operation failed. |
| */ |
| static RD_UNUSED RD_INLINE int |
| rd_tmpabuf_failed (rd_tmpabuf_t *tab) { |
| return tab->failed; |
| } |
| |
| /** |
| * @brief Allocate \p size bytes for writing, returning an aligned pointer |
| * to the memory. |
| * @returns the allocated pointer (within the tmpabuf) on success or |
| * NULL if the requested number of bytes + alignment is not available |
| * in the tmpabuf. |
| */ |
| static RD_UNUSED void * |
| rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) { |
| void *ptr; |
| |
| if (unlikely(tab->failed)) |
| return NULL; |
| |
| if (unlikely(tab->of + size > tab->size)) { |
| if (tab->assert_on_fail) { |
| fprintf(stderr, |
| "%s: %s:%d: requested size %zd + %zd > %zd\n", |
| __FUNCTION__, func, line, tab->of, size, |
| tab->size); |
| assert(!*"rd_tmpabuf_alloc: not enough size in buffer"); |
| } |
| return NULL; |
| } |
| |
| ptr = (void *)(tab->buf + tab->of); |
| tab->of += RD_ROUNDUP(size, 8); |
| |
| return ptr; |
| } |
| |
| #define rd_tmpabuf_alloc(tab,size) \ |
| rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size) |
| |
| /** |
| * @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion. |
| * |
| * @returns the allocated and written-to pointer (within the tmpabuf) on success |
| * or NULL if the requested number of bytes + alignment is not available |
| * in the tmpabuf. |
| */ |
| static RD_UNUSED void * |
| rd_tmpabuf_write0 (const char *func, int line, |
| rd_tmpabuf_t *tab, const void *buf, size_t size) { |
| void *ptr = rd_tmpabuf_alloc0(func, line, tab, size); |
| |
| if (ptr) |
| memcpy(ptr, buf, size); |
| |
| return ptr; |
| } |
| #define rd_tmpabuf_write(tab,buf,size) \ |
| rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size) |
| |
| |
| /** |
| * @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string. |
| */ |
| static RD_UNUSED char * |
| rd_tmpabuf_write_str0 (const char *func, int line, |
| rd_tmpabuf_t *tab, const char *str) { |
| return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1); |
| } |
| #define rd_tmpabuf_write_str(tab,str) \ |
| rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str) |
| |
| |
| |
| /** |
| * @name Read buffer interface |
| * |
| * Memory reading helper macros to be used when parsing network responses. |
| * |
| * Assumptions: |
| * - an 'err_parse:' goto-label must be available for error bailouts, |
| * the error code will be set in rkbuf->rkbuf_err |
| * - local `int log_decode_errors` variable set to the logging level |
| * to log parse errors (or 0 to turn off logging). |
| */ |
| |
| #define rd_kafka_buf_parse_fail(rkbuf,...) do { \ |
| if (log_decode_errors > 0) { \ |
| rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \ |
| rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \ |
| "PROTOERR", \ |
| "Protocol parse failure " \ |
| "at %"PRIusz"/%"PRIusz" (%s:%i) " \ |
| "(incorrect broker.version.fallback?)", \ |
| rd_slice_offset(&rkbuf->rkbuf_reader), \ |
| rd_slice_size(&rkbuf->rkbuf_reader), \ |
| __FUNCTION__, __LINE__); \ |
| rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \ |
| "PROTOERR", __VA_ARGS__); \ |
| } \ |
| (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \ |
| goto err_parse; \ |
| } while (0) |
| |
| |
| |
| /** |
| * Returns the number of remaining bytes available to read. |
| */ |
| #define rd_kafka_buf_read_remain(rkbuf) \ |
| rd_slice_remains(&(rkbuf)->rkbuf_reader) |
| |
| /** |
| * Checks that at least 'len' bytes remain to be read in buffer, else fails. |
| */ |
| #define rd_kafka_buf_check_len(rkbuf,len) do { \ |
| size_t __len0 = (size_t)(len); \ |
| if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \ |
| rd_kafka_buf_parse_fail( \ |
| rkbuf, \ |
| "expected %"PRIusz" bytes > %"PRIusz \ |
| " remaining bytes", \ |
| __len0, rd_kafka_buf_read_remain(rkbuf)); \ |
| (rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \ |
| goto err_parse; \ |
| } \ |
| } while (0) |
| |
| /** |
| * Skip (as in read and ignore) the next 'len' bytes. |
| */ |
| #define rd_kafka_buf_skip(rkbuf, len) do { \ |
| size_t __len1 = (size_t)(len); \ |
| if (__len1 && \ |
| !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \ |
| rd_kafka_buf_check_len(rkbuf, __len1); \ |
| } while (0) |
| |
| /** |
| * Skip (as in read and ignore) up to fixed position \p pos. |
| */ |
| #define rd_kafka_buf_skip_to(rkbuf, pos) do { \ |
| size_t __len1 = (size_t)(pos) - \ |
| rd_slice_offset(&(rkbuf)->rkbuf_reader); \ |
| if (__len1 && \ |
| !rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \ |
| rd_kafka_buf_check_len(rkbuf, __len1); \ |
| } while (0) |
| |
| |
| |
| /** |
| * Read 'len' bytes and copy to 'dstptr' |
| */ |
| #define rd_kafka_buf_read(rkbuf,dstptr,len) do { \ |
| size_t __len2 = (size_t)(len); \ |
| if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2)) \ |
| rd_kafka_buf_check_len(rkbuf, __len2); \ |
| } while (0) |
| |
| |
| /** |
| * @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr |
| * without affecting the current reader position. |
| */ |
| #define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do { \ |
| size_t __len2 = (size_t)(len); \ |
| if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset, \ |
| dstptr, __len2)) \ |
| rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \ |
| } while (0) |
| |
| |
| /** |
| * Read a 16,32,64-bit integer and store it in 'dstptr' |
| */ |
| #define rd_kafka_buf_read_i64(rkbuf,dstptr) do { \ |
| int64_t _v; \ |
| rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \ |
| *(dstptr) = be64toh(_v); \ |
| } while (0) |
| |
| #define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do { \ |
| int64_t _v; \ |
| rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \ |
| *(dstptr) = be64toh(_v); \ |
| } while (0) |
| |
| #define rd_kafka_buf_read_i32(rkbuf,dstptr) do { \ |
| int32_t _v; \ |
| rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \ |
| *(dstptr) = be32toh(_v); \ |
| } while (0) |
| |
| /* Same as .._read_i32 but does a direct assignment. |
| * dst is assumed to be a scalar, not pointer. */ |
| #define rd_kafka_buf_read_i32a(rkbuf, dst) do { \ |
| int32_t _v; \ |
| rd_kafka_buf_read(rkbuf, &_v, 4); \ |
| dst = (int32_t) be32toh(_v); \ |
| } while (0) |
| |
| #define rd_kafka_buf_read_i16(rkbuf,dstptr) do { \ |
| int16_t _v; \ |
| rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \ |
| *(dstptr) = be16toh(_v); \ |
| } while (0) |
| |
| |
| #define rd_kafka_buf_read_i16a(rkbuf, dst) do { \ |
| int16_t _v; \ |
| rd_kafka_buf_read(rkbuf, &_v, 2); \ |
| dst = (int16_t)be16toh(_v); \ |
| } while (0) |
| |
| #define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1) |
| |
| #define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1) |
| |
| |
| /** |
| * @brief Read varint and store in int64_t \p dst |
| */ |
| #define rd_kafka_buf_read_varint(rkbuf,dst) do { \ |
| int64_t _v; \ |
| size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \ |
| if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \ |
| rd_kafka_buf_parse_fail(rkbuf, \ |
| "varint parsing failed: " \ |
| "buffer underflow"); \ |
| *(dst) = _v; \ |
| } while (0) |
| |
| /* Read Kafka String representation (2+N). |
| * The kstr data will be updated to point to the rkbuf. */ |
| #define rd_kafka_buf_read_str(rkbuf, kstr) do { \ |
| int _klen; \ |
| rd_kafka_buf_read_i16a(rkbuf, (kstr)->len); \ |
| _klen = RD_KAFKAP_STR_LEN(kstr); \ |
| if (RD_KAFKAP_STR_LEN0(_klen) == 0) \ |
| (kstr)->str = NULL; \ |
| else if (!((kstr)->str = \ |
| rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \ |
| _klen))) \ |
| rd_kafka_buf_check_len(rkbuf, _klen); \ |
| } while (0) |
| |
| /* Read Kafka String representation (2+N) and write it to the \p tmpabuf |
| * with a trailing nul byte. */ |
| #define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do { \ |
| rd_kafkap_str_t _kstr; \ |
| size_t _slen; \ |
| char *_dst; \ |
| rd_kafka_buf_read_str(rkbuf, &_kstr); \ |
| _slen = RD_KAFKAP_STR_LEN(&_kstr); \ |
| if (!(_dst = \ |
| rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1))) \ |
| rd_kafka_buf_parse_fail( \ |
| rkbuf, \ |
| "Not enough room in tmpabuf: " \ |
| "%"PRIusz"+%"PRIusz \ |
| " > %"PRIusz, \ |
| (tmpabuf)->of, _slen+1, (tmpabuf)->size); \ |
| _dst[_slen] = '\0'; \ |
| dst = (void *)_dst; \ |
| } while (0) |
| |
| /** |
| * Skip a string. |
| */ |
| #define rd_kafka_buf_skip_str(rkbuf) do { \ |
| int16_t _slen; \ |
| rd_kafka_buf_read_i16(rkbuf, &_slen); \ |
| rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \ |
| } while (0) |
| |
| /* Read Kafka Bytes representation (4+N). |
| * The 'kbytes' will be updated to point to rkbuf data */ |
| #define rd_kafka_buf_read_bytes(rkbuf, kbytes) do { \ |
| int _klen; \ |
| rd_kafka_buf_read_i32a(rkbuf, _klen); \ |
| (kbytes)->len = _klen; \ |
| if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \ |
| (kbytes)->data = NULL; \ |
| (kbytes)->len = 0; \ |
| } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \ |
| (kbytes)->data = ""; \ |
| else if (!((kbytes)->data = \ |
| rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \ |
| _klen))) \ |
| rd_kafka_buf_check_len(rkbuf, _klen); \ |
| } while (0) |
| |
| |
| /** |
| * @brief Read \p size bytes from buffer, setting \p *ptr to the start |
| * of the memory region. |
| */ |
| #define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do { \ |
| size_t _klen = size; \ |
| if (!(*(ptr) = (void *) \ |
| rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \ |
| rd_kafka_buf_check_len(rkbuf, _klen); \ |
| } while (0) |
| |
| |
| /** |
| * @brief Read varint-lengted Kafka Bytes representation |
| */ |
| #define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \ |
| int64_t _len2; \ |
| size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \ |
| &_len2); \ |
| if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \ |
| rd_kafka_buf_parse_fail(rkbuf, \ |
| "varint parsing failed: " \ |
| "buffer underflow"); \ |
| (kbytes)->len = (int32_t)_len2; \ |
| if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \ |
| (kbytes)->data = NULL; \ |
| (kbytes)->len = 0; \ |
| } else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \ |
| (kbytes)->data = ""; \ |
| else if (!((kbytes)->data = \ |
| rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \ |
| _len2))) \ |
| rd_kafka_buf_check_len(rkbuf, _len2); \ |
| } while (0) |
| |
| |
| /** |
| * Response handling callback. |
| * |
| * NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY' |
| * which indicates that some entity is terminating (rd_kafka_t, broker, |
| * toppar, queue, etc) and the callback may not be called in the |
| * correct thread. In this case the callback must perform just |
| * the most minimal cleanup and dont trigger any other operations. |
| * |
| * NOTE: rkb, reply and request may be NULL, depending on error situation. |
| */ |
| typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, |
| rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *reply, |
| rd_kafka_buf_t *request, |
| void *opaque); |
| |
| struct rd_kafka_buf_s { /* rd_kafka_buf_t */ |
| TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; |
| |
| int32_t rkbuf_corrid; |
| |
| rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */ |
| |
| int rkbuf_flags; /* RD_KAFKA_OP_F */ |
| |
| rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */ |
| rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */ |
| |
| int rkbuf_connid; /* broker connection id (used when buffer |
| * was partially sent). */ |
| size_t rkbuf_totlen; /* recv: total expected length, |
| * send: not used */ |
| |
| rd_crc32_t rkbuf_crc; /* Current CRC calculation */ |
| |
| struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header. |
| * These fields are encoded |
| * and written to output buffer |
| * on buffer finalization. */ |
| struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header. |
| * Decoded fields are copied |
| * here from the buffer |
| * to provide an ease-of-use |
| * interface to the header */ |
| |
| int32_t rkbuf_expected_size; /* expected size of message */ |
| |
| rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */ |
| rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used |
| * for retries from inside |
| * the rkbuf_cb() callback |
| * since rkbuf_replyq will |
| * have been reset. */ |
| rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */ |
| struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */ |
| |
| struct rd_kafka_broker_s *rkbuf_rkb; |
| |
| rd_refcnt_t rkbuf_refcnt; |
| void *rkbuf_opaque; |
| |
| int rkbuf_retries; /* Retries so far. */ |
| #define RD_KAFKA_BUF_NO_RETRIES 1000000 /* Do not retry */ |
| |
| int rkbuf_features; /* Required feature(s) that must be |
| * supported by broker. */ |
| |
| rd_ts_t rkbuf_ts_enq; |
| rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission, |
| * after response: RTT. */ |
| rd_ts_t rkbuf_ts_timeout; |
| |
| int64_t rkbuf_offset; /* Used by OffsetCommit */ |
| |
| rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map. |
| * Used by FetchRequest. */ |
| |
| rd_kafka_msgq_t rkbuf_msgq; |
| |
| rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */ |
| |
| union { |
| struct { |
| rd_list_t *topics; /* Requested topics (char *) */ |
| char *reason; /* Textual reason */ |
| rd_kafka_op_t *rko; /* Originating rko with replyq |
| * (if any) */ |
| int all_topics; /* Full/All topics requested */ |
| |
| int *decr; /* Decrement this integer by one |
| * when request is complete: |
| * typically points to metadata |
| * cache's full_.._sent. |
| * Will be performed with |
| * decr_lock held. */ |
| mtx_t *decr_lock; |
| |
| } Metadata; |
| } rkbuf_u; |
| }; |
| |
| |
| typedef struct rd_kafka_bufq_s { |
| TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs; |
| rd_atomic32_t rkbq_cnt; |
| rd_atomic32_t rkbq_msg_cnt; |
| } rd_kafka_bufq_t; |
| |
| #define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt) |
| |
| |
| #define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt) |
| #define rd_kafka_buf_destroy(rkbuf) \ |
| rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt, \ |
| rd_kafka_buf_destroy_final(rkbuf)) |
| |
| void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf); |
| void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len, |
| int allow_crc_calc, void (*free_cb) (void *)); |
| #define rd_kafka_buf_push(rkbuf,buf,len,free_cb) \ |
| rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb) |
| rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags); |
| #define rd_kafka_buf_new(segcnt,size) \ |
| rd_kafka_buf_new0(segcnt,size,0) |
| rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey, |
| int segcnt, size_t size); |
| rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size, |
| void (*free_cb) (void *)); |
| void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf); |
| void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf); |
| void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq); |
| void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src); |
| void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb, |
| rd_kafka_bufq_t *rkbufq, |
| rd_kafka_resp_err_t err); |
| void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb, |
| rd_kafka_bufq_t *rkbufq); |
| void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac, |
| rd_kafka_bufq_t *rkbq); |
| |
| int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf); |
| |
| void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err); |
| void rd_kafka_buf_callback (rd_kafka_t *rk, |
| rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, |
| rd_kafka_buf_t *response, rd_kafka_buf_t *request); |
| |
| |
| |
| /** |
| * |
| * Write buffer interface |
| * |
| */ |
| |
| /** |
| * Set request API type version |
| */ |
| static RD_UNUSED RD_INLINE void |
| rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf, |
| int16_t version, int features) { |
| rkbuf->rkbuf_reqhdr.ApiVersion = version; |
| rkbuf->rkbuf_features = features; |
| } |
| |
| |
| /** |
| * @returns the ApiVersion for a request |
| */ |
| #define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion) |
| |
| |
| |
| /** |
| * Write (copy) data to buffer at current write-buffer position. |
| * There must be enough space allocated in the rkbuf. |
| * Returns offset to written destination buffer. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf, |
| const void *data, size_t len) { |
| size_t r; |
| |
| r = rd_buf_write(&rkbuf->rkbuf_buf, data, len); |
| |
| if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC) |
| rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len); |
| |
| return r; |
| } |
| |
| |
| |
| /** |
| * Write (copy) 'data' to buffer at 'ptr'. |
| * There must be enough space to fit 'len'. |
| * This will overwrite the buffer at given location and length. |
| * |
| * NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation |
| * is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize()) |
| */ |
| static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of, |
| const void *data, size_t len) { |
| rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)); |
| rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len); |
| } |
| |
| /** |
| * Write int8_t to buffer. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf, |
| int8_t v) { |
| return rd_kafka_buf_write(rkbuf, &v, sizeof(v)); |
| } |
| |
| /** |
| * Update int8_t in buffer at offset 'of'. |
| * 'of' should have been previously returned by `.._buf_write_i8()`. |
| */ |
| static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf, |
| size_t of, int8_t v) { |
| rd_kafka_buf_update(rkbuf, of, &v, sizeof(v)); |
| } |
| |
| /** |
| * Write int16_t to buffer. |
| * The value will be endian-swapped before write. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf, |
| int16_t v) { |
| v = htobe16(v); |
| return rd_kafka_buf_write(rkbuf, &v, sizeof(v)); |
| } |
| |
| /** |
| * Update int16_t in buffer at offset 'of'. |
| * 'of' should have been previously returned by `.._buf_write_i16()`. |
| */ |
| static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf, |
| size_t of, int16_t v) { |
| v = htobe16(v); |
| rd_kafka_buf_update(rkbuf, of, &v, sizeof(v)); |
| } |
| |
| /** |
| * Write int32_t to buffer. |
| * The value will be endian-swapped before write. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf, |
| int32_t v) { |
| v = htobe32(v); |
| return rd_kafka_buf_write(rkbuf, &v, sizeof(v)); |
| } |
| |
| /** |
| * Update int32_t in buffer at offset 'of'. |
| * 'of' should have been previously returned by `.._buf_write_i32()`. |
| */ |
| static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf, |
| size_t of, int32_t v) { |
| v = htobe32(v); |
| rd_kafka_buf_update(rkbuf, of, &v, sizeof(v)); |
| } |
| |
| /** |
| * Update int32_t in buffer at offset 'of'. |
| * 'of' should have been previously returned by `.._buf_write_i32()`. |
| */ |
| static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf, |
| size_t of, uint32_t v) { |
| v = htobe32(v); |
| rd_kafka_buf_update(rkbuf, of, &v, sizeof(v)); |
| } |
| |
| |
| /** |
| * Write int64_t to buffer. |
| * The value will be endian-swapped before write. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) { |
| v = htobe64(v); |
| return rd_kafka_buf_write(rkbuf, &v, sizeof(v)); |
| } |
| |
| /** |
| * Update int64_t in buffer at address 'ptr'. |
| * 'of' should have been previously returned by `.._buf_write_i64()`. |
| */ |
| static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf, |
| size_t of, int64_t v) { |
| v = htobe64(v); |
| rd_kafka_buf_update(rkbuf, of, &v, sizeof(v)); |
| } |
| |
| |
| /** |
| * Write (copy) Kafka string to buffer. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf, |
| const rd_kafkap_str_t *kstr) { |
| return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr), |
| RD_KAFKAP_STR_SIZE(kstr)); |
| } |
| |
| /** |
| * Write (copy) char * string to buffer. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf, |
| const char *str, size_t len) { |
| size_t r; |
| if (!str) |
| len = RD_KAFKAP_STR_LEN_NULL; |
| else if (len == (size_t)-1) |
| len = strlen(str); |
| r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len); |
| if (str) |
| rd_kafka_buf_write(rkbuf, str, len); |
| return r; |
| } |
| |
| |
| /** |
| * Push (i.e., no copy) Kafka string to buffer iovec |
| */ |
| static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf, |
| const rd_kafkap_str_t *kstr) { |
| rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr), |
| RD_KAFKAP_STR_SIZE(kstr), NULL); |
| } |
| |
| |
| |
| /** |
| * Write (copy) Kafka bytes to buffer. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf, |
| const rd_kafkap_bytes_t *kbytes){ |
| return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes), |
| RD_KAFKAP_BYTES_SIZE(kbytes)); |
| } |
| |
| /** |
| * Push (i.e., no copy) Kafka bytes to buffer iovec |
| */ |
| static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf, |
| const rd_kafkap_bytes_t *kbytes){ |
| rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes), |
| RD_KAFKAP_BYTES_SIZE(kbytes), NULL); |
| } |
| |
| /** |
| * Write (copy) binary bytes to buffer as Kafka bytes encapsulate data. |
| */ |
| static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf, |
| const void *payload, size_t size) { |
| size_t r; |
| if (!payload) |
| size = RD_KAFKAP_BYTES_LEN_NULL; |
| r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size); |
| if (payload) |
| rd_kafka_buf_write(rkbuf, payload, size); |
| return r; |
| } |
| |
| |
| |
| |
| /** |
| * Write Kafka Message to buffer |
| * The number of bytes written is returned in '*outlenp'. |
| * |
| * Returns the buffer offset of the first byte. |
| */ |
| size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb, |
| rd_kafka_buf_t *rkbuf, |
| int64_t Offset, int8_t MagicByte, |
| int8_t Attributes, int64_t Timestamp, |
| const void *key, int32_t key_len, |
| const void *payload, int32_t len, |
| int *outlenp); |
| |
| /** |
| * Start calculating CRC from now and track it in '*crcp'. |
| */ |
| static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) { |
| rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)); |
| rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC; |
| rkbuf->rkbuf_crc = rd_crc32_init(); |
| } |
| |
| /** |
| * Finalizes CRC calculation and returns the calculated checksum. |
| */ |
| static RD_INLINE RD_UNUSED |
| rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) { |
| rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC; |
| return rd_crc32_finalize(rkbuf->rkbuf_crc); |
| } |
| |
| |
| |
| |
| |
| /** |
| * @brief Check if buffer's replyq.version is outdated. |
| * @param rkbuf: may be NULL, for convenience. |
| * |
| * @returns 1 if this is an outdated buffer, else 0. |
| */ |
| static RD_UNUSED RD_INLINE int |
| rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) { |
| return rkbuf && rkbuf->rkbuf_replyq.version && |
| rkbuf->rkbuf_replyq.version < version; |
| } |