| /* |
| * librdkafka - The Apache Kafka C/C++ library |
| * |
| * Copyright (c) 2016 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #pragma once |
| |
| #include "rdkafka_op.h" |
| #include "rdkafka_int.h" |
| |
| #ifdef _MSC_VER |
| #include <io.h> /* for _write() */ |
| #endif |
| |
| |
| TAILQ_HEAD(rd_kafka_op_tailq, rd_kafka_op_s); |
| |
| struct rd_kafka_q_s { |
| mtx_t rkq_lock; |
| cnd_t rkq_cond; |
| struct rd_kafka_q_s *rkq_fwdq; /* Forwarded/Routed queue. |
| * Used in place of this queue |
| * for all operations. */ |
| |
| struct rd_kafka_op_tailq rkq_q; /* TAILQ_HEAD(, rd_kafka_op_s) */ |
| int rkq_qlen; /* Number of entries in queue */ |
| int64_t rkq_qsize; /* Size of all entries in queue */ |
| int rkq_refcnt; |
| int rkq_flags; |
| #define RD_KAFKA_Q_F_ALLOCATED 0x1 /* Allocated: rd_free on destroy */ |
| #define RD_KAFKA_Q_F_READY 0x2 /* Queue is ready to be used. |
| * Flag is cleared on destroy */ |
| #define RD_KAFKA_Q_F_FWD_APP 0x4 /* Queue is being forwarded by a call |
| * to rd_kafka_queue_forward. */ |
| |
| rd_kafka_t *rkq_rk; |
| struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */ |
| |
| /* Op serve callback (optional). |
| * Mainly used for forwarded queues to use the original queue's |
| * serve function from the forwarded position. |
| * Shall return 1 if op was handled, else 0. */ |
| rd_kafka_q_serve_cb_t *rkq_serve; |
| void *rkq_opaque; |
| |
| #if ENABLE_DEVEL |
| char rkq_name[64]; /* Debugging: queue name (FUNC:LINE) */ |
| #else |
| const char *rkq_name; /* Debugging: queue name (FUNC) */ |
| #endif |
| }; |
| |
| |
| /* FD-based application signalling state holder. */ |
| struct rd_kafka_q_io { |
| int fd; |
| void *payload; |
| size_t size; |
| }; |
| |
| |
| |
| /** |
| * @return true if queue is ready/enabled, else false. |
| * @remark queue luck must be held by caller (if applicable) |
| */ |
| static RD_INLINE RD_UNUSED |
| int rd_kafka_q_ready (rd_kafka_q_t *rkq) { |
| return rkq->rkq_flags & RD_KAFKA_Q_F_READY; |
| } |
| |
| |
| |
| |
| void rd_kafka_q_init (rd_kafka_q_t *rkq, rd_kafka_t *rk); |
| rd_kafka_q_t *rd_kafka_q_new0 (rd_kafka_t *rk, const char *func, int line); |
| #define rd_kafka_q_new(rk) rd_kafka_q_new0(rk,__FUNCTION__,__LINE__) |
| void rd_kafka_q_destroy_final (rd_kafka_q_t *rkq); |
| |
| #define rd_kafka_q_lock(rkqu) mtx_lock(&(rkqu)->rkq_lock) |
| #define rd_kafka_q_unlock(rkqu) mtx_unlock(&(rkqu)->rkq_lock) |
| |
| static RD_INLINE RD_UNUSED |
| rd_kafka_q_t *rd_kafka_q_keep (rd_kafka_q_t *rkq) { |
| mtx_lock(&rkq->rkq_lock); |
| rkq->rkq_refcnt++; |
| mtx_unlock(&rkq->rkq_lock); |
| return rkq; |
| } |
| |
| static RD_INLINE RD_UNUSED |
| rd_kafka_q_t *rd_kafka_q_keep_nolock (rd_kafka_q_t *rkq) { |
| rkq->rkq_refcnt++; |
| return rkq; |
| } |
| |
| |
| /** |
| * @returns the queue's name (used for debugging) |
| */ |
| static RD_INLINE RD_UNUSED |
| const char *rd_kafka_q_name (rd_kafka_q_t *rkq) { |
| return rkq->rkq_name; |
| } |
| |
| /** |
| * @returns the final destination queue name (after forwarding) |
| * @remark rkq MUST NOT be locked |
| */ |
| static RD_INLINE RD_UNUSED |
| const char *rd_kafka_q_dest_name (rd_kafka_q_t *rkq) { |
| const char *ret; |
| mtx_lock(&rkq->rkq_lock); |
| if (rkq->rkq_fwdq) |
| ret = rd_kafka_q_dest_name(rkq->rkq_fwdq); |
| else |
| ret = rd_kafka_q_name(rkq); |
| mtx_unlock(&rkq->rkq_lock); |
| return ret; |
| } |
| |
| |
| static RD_INLINE RD_UNUSED |
| void rd_kafka_q_destroy (rd_kafka_q_t *rkq) { |
| int do_delete = 0; |
| |
| mtx_lock(&rkq->rkq_lock); |
| rd_kafka_assert(NULL, rkq->rkq_refcnt > 0); |
| do_delete = !--rkq->rkq_refcnt; |
| mtx_unlock(&rkq->rkq_lock); |
| |
| if (unlikely(do_delete)) |
| rd_kafka_q_destroy_final(rkq); |
| } |
| |
| |
| /** |
| * Reset a queue. |
| * WARNING: All messages will be lost and leaked. |
| * NOTE: No locking is performed. |
| */ |
| static RD_INLINE RD_UNUSED |
| void rd_kafka_q_reset (rd_kafka_q_t *rkq) { |
| TAILQ_INIT(&rkq->rkq_q); |
| rd_dassert(TAILQ_EMPTY(&rkq->rkq_q)); |
| rkq->rkq_qlen = 0; |
| rkq->rkq_qsize = 0; |
| } |
| |
| |
| /** |
| * Disable a queue. |
| * Attempting to enqueue messages to the queue will destroy them. |
| */ |
| static RD_INLINE RD_UNUSED |
| void rd_kafka_q_disable0 (rd_kafka_q_t *rkq, int do_lock) { |
| if (do_lock) |
| mtx_lock(&rkq->rkq_lock); |
| rkq->rkq_flags &= ~RD_KAFKA_Q_F_READY; |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| } |
| #define rd_kafka_q_disable(rkq) rd_kafka_q_disable0(rkq, 1/*lock*/) |
| |
| /** |
| * Forward 'srcq' to 'destq' |
| */ |
| void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq, |
| int do_lock, int fwd_app); |
| #define rd_kafka_q_fwd_set(S,D) rd_kafka_q_fwd_set0(S,D,1/*lock*/,\ |
| 0/*no fwd_app*/) |
| |
| /** |
| * @returns the forward queue (if any) with its refcount increased. |
| * @locks rd_kafka_q_lock(rkq) == !do_lock |
| */ |
| static RD_INLINE RD_UNUSED |
| rd_kafka_q_t *rd_kafka_q_fwd_get (rd_kafka_q_t *rkq, int do_lock) { |
| rd_kafka_q_t *fwdq; |
| if (do_lock) |
| mtx_lock(&rkq->rkq_lock); |
| |
| if ((fwdq = rkq->rkq_fwdq)) |
| rd_kafka_q_keep(fwdq); |
| |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| |
| return fwdq; |
| } |
| |
| |
| /** |
| * @returns true if queue is forwarded, else false. |
| * |
| * @remark Thread-safe. |
| */ |
| static RD_INLINE RD_UNUSED int rd_kafka_q_is_fwded (rd_kafka_q_t *rkq) { |
| int r; |
| mtx_lock(&rkq->rkq_lock); |
| r = rkq->rkq_fwdq ? 1 : 0; |
| mtx_unlock(&rkq->rkq_lock); |
| return r; |
| } |
| |
| |
| |
| /** |
| * @brief Trigger an IO event for this queue. |
| * |
| * @remark Queue MUST be locked |
| */ |
| static RD_INLINE RD_UNUSED |
| void rd_kafka_q_io_event (rd_kafka_q_t *rkq) { |
| ssize_t r; |
| |
| if (likely(!rkq->rkq_qio)) |
| return; |
| |
| #ifdef _MSC_VER |
| r = _write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, (int)rkq->rkq_qio->size); |
| #else |
| r = write(rkq->rkq_qio->fd, rkq->rkq_qio->payload, rkq->rkq_qio->size); |
| #endif |
| if (r == -1) { |
| fprintf(stderr, |
| "[ERROR:librdkafka:rd_kafka_q_io_event: " |
| "write(%d,..,%d) failed on queue %p \"%s\": %s: " |
| "disabling further IO events]\n", |
| rkq->rkq_qio->fd, (int)rkq->rkq_qio->size, |
| rkq, rd_kafka_q_name(rkq), rd_strerror(errno)); |
| /* FIXME: Log this, somehow */ |
| rd_free(rkq->rkq_qio); |
| rkq->rkq_qio = NULL; |
| } |
| } |
| |
| |
| /** |
| * @brief rko->rko_prio comparator |
| * @remark: descending order: higher priority takes preceedence. |
| */ |
| static RD_INLINE RD_UNUSED |
| int rd_kafka_op_cmp_prio (const void *_a, const void *_b) { |
| const rd_kafka_op_t *a = _a, *b = _b; |
| |
| return b->rko_prio - a->rko_prio; |
| } |
| |
| |
| /** |
| * @brief Low-level unprotected enqueue that only performs |
| * the actual queue enqueue and counter updates. |
| * @remark Will not perform locking, signaling, fwdq, READY checking, etc. |
| */ |
| static RD_INLINE RD_UNUSED void |
| rd_kafka_q_enq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int at_head) { |
| if (likely(!rko->rko_prio)) |
| TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link); |
| else if (at_head) |
| TAILQ_INSERT_HEAD(&rkq->rkq_q, rko, rko_link); |
| else |
| TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *, |
| rko_link, rd_kafka_op_cmp_prio); |
| rkq->rkq_qlen++; |
| rkq->rkq_qsize += rko->rko_len; |
| } |
| |
| |
| /** |
| * @brief Enqueue the 'rko' op at the tail of the queue 'rkq'. |
| * |
| * The provided 'rko' is either enqueued or destroyed. |
| * |
| * @returns 1 if op was enqueued or 0 if queue is disabled and |
| * there was no replyq to enqueue on in which case the rko is destroyed. |
| * |
| * Locality: any thread. |
| */ |
| static RD_INLINE RD_UNUSED |
| int rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { |
| rd_kafka_q_t *fwdq; |
| |
| mtx_lock(&rkq->rkq_lock); |
| |
| rd_dassert(rkq->rkq_refcnt > 0); |
| |
| if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) { |
| |
| /* Queue has been disabled, reply to and fail the rko. */ |
| mtx_unlock(&rkq->rkq_lock); |
| |
| return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY); |
| } |
| |
| if (!rko->rko_serve && rkq->rkq_serve) { |
| /* Store original queue's serve callback and opaque |
| * prior to forwarding. */ |
| rko->rko_serve = rkq->rkq_serve; |
| rko->rko_serve_opaque = rkq->rkq_opaque; |
| } |
| |
| if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| rd_kafka_q_enq0(rkq, rko, 0); |
| cnd_signal(&rkq->rkq_cond); |
| if (rkq->rkq_qlen == 1) |
| rd_kafka_q_io_event(rkq); |
| mtx_unlock(&rkq->rkq_lock); |
| } else { |
| mtx_unlock(&rkq->rkq_lock); |
| rd_kafka_q_enq(fwdq, rko); |
| rd_kafka_q_destroy(fwdq); |
| } |
| |
| return 1; |
| } |
| |
| |
| /** |
| * @brief Re-enqueue rko at head of rkq. |
| * |
| * The provided 'rko' is either enqueued or destroyed. |
| * |
| * @returns 1 if op was enqueued or 0 if queue is disabled and |
| * there was no replyq to enqueue on in which case the rko is destroyed. |
| * |
| * @locks rkq MUST BE LOCKED |
| * |
| * Locality: any thread. |
| */ |
| static RD_INLINE RD_UNUSED |
| int rd_kafka_q_reenq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { |
| rd_kafka_q_t *fwdq; |
| |
| rd_dassert(rkq->rkq_refcnt > 0); |
| |
| if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) |
| return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY); |
| |
| if (!rko->rko_serve && rkq->rkq_serve) { |
| /* Store original queue's serve callback and opaque |
| * prior to forwarding. */ |
| rko->rko_serve = rkq->rkq_serve; |
| rko->rko_serve_opaque = rkq->rkq_opaque; |
| } |
| |
| if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| rd_kafka_q_enq0(rkq, rko, 1/*at_head*/); |
| cnd_signal(&rkq->rkq_cond); |
| if (rkq->rkq_qlen == 1) |
| rd_kafka_q_io_event(rkq); |
| } else { |
| rd_kafka_q_enq(fwdq, rko); |
| rd_kafka_q_destroy(fwdq); |
| } |
| |
| return 1; |
| } |
| |
| |
| /** |
| * Dequeue 'rko' from queue 'rkq'. |
| * |
| * NOTE: rkq_lock MUST be held |
| * Locality: any thread |
| */ |
| static RD_INLINE RD_UNUSED |
| void rd_kafka_q_deq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { |
| rd_dassert(rkq->rkq_flags & RD_KAFKA_Q_F_READY); |
| rd_dassert(rkq->rkq_qlen > 0 && |
| rkq->rkq_qsize >= (int64_t)rko->rko_len); |
| |
| TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); |
| rkq->rkq_qlen--; |
| rkq->rkq_qsize -= rko->rko_len; |
| } |
| |
| /** |
| * Concat all elements of 'srcq' onto tail of 'rkq'. |
| * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not. |
| * NOTE: 'srcq' will be reset. |
| * |
| * Locality: any thread. |
| * |
| * @returns 0 if operation was performed or -1 if rkq is disabled. |
| */ |
| static RD_INLINE RD_UNUSED |
| int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) { |
| int r = 0; |
| |
| while (srcq->rkq_fwdq) /* Resolve source queue */ |
| srcq = srcq->rkq_fwdq; |
| if (unlikely(srcq->rkq_qlen == 0)) |
| return 0; /* Don't do anything if source queue is empty */ |
| |
| if (do_lock) |
| mtx_lock(&rkq->rkq_lock); |
| if (!rkq->rkq_fwdq) { |
| rd_kafka_op_t *rko; |
| |
| rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) || |
| srcq->rkq_qlen > 0); |
| if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) { |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| return -1; |
| } |
| /* First insert any prioritized ops from srcq |
| * in the right position in rkq. */ |
| while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) { |
| TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link); |
| TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, |
| rd_kafka_op_t *, rko_link, |
| rd_kafka_op_cmp_prio); |
| } |
| |
| TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link); |
| if (rkq->rkq_qlen == 0) |
| rd_kafka_q_io_event(rkq); |
| rkq->rkq_qlen += srcq->rkq_qlen; |
| rkq->rkq_qsize += srcq->rkq_qsize; |
| cnd_signal(&rkq->rkq_cond); |
| |
| rd_kafka_q_reset(srcq); |
| } else |
| r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq, |
| srcq, |
| rkq->rkq_fwdq ? do_lock : 0); |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| |
| return r; |
| } |
| |
| #define rd_kafka_q_concat(dstq,srcq) rd_kafka_q_concat0(dstq,srcq,1/*lock*/) |
| |
| |
| /** |
| * @brief Prepend all elements of 'srcq' onto head of 'rkq'. |
| * 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not. |
| * 'srcq' will be reset. |
| * |
| * @remark Will not respect priority of ops, srcq will be prepended in its |
| * original form to rkq. |
| * |
| * @locality any thread. |
| */ |
| static RD_INLINE RD_UNUSED |
| void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, |
| int do_lock) { |
| if (do_lock) |
| mtx_lock(&rkq->rkq_lock); |
| if (!rkq->rkq_fwdq && !srcq->rkq_fwdq) { |
| /* FIXME: prio-aware */ |
| /* Concat rkq on srcq */ |
| TAILQ_CONCAT(&srcq->rkq_q, &rkq->rkq_q, rko_link); |
| /* Move srcq to rkq */ |
| TAILQ_MOVE(&rkq->rkq_q, &srcq->rkq_q, rko_link); |
| if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0) |
| rd_kafka_q_io_event(rkq); |
| rkq->rkq_qlen += srcq->rkq_qlen; |
| rkq->rkq_qsize += srcq->rkq_qsize; |
| |
| rd_kafka_q_reset(srcq); |
| } else |
| rd_kafka_q_prepend0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq, |
| srcq->rkq_fwdq ? srcq->rkq_fwdq : srcq, |
| rkq->rkq_fwdq ? do_lock : 0); |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| } |
| |
| #define rd_kafka_q_prepend(dstq,srcq) rd_kafka_q_prepend0(dstq,srcq,1/*lock*/) |
| |
| |
| /* Returns the number of elements in the queue */ |
| static RD_INLINE RD_UNUSED |
| int rd_kafka_q_len (rd_kafka_q_t *rkq) { |
| int qlen; |
| rd_kafka_q_t *fwdq; |
| mtx_lock(&rkq->rkq_lock); |
| if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| qlen = rkq->rkq_qlen; |
| mtx_unlock(&rkq->rkq_lock); |
| } else { |
| mtx_unlock(&rkq->rkq_lock); |
| qlen = rd_kafka_q_len(fwdq); |
| rd_kafka_q_destroy(fwdq); |
| } |
| return qlen; |
| } |
| |
| /* Returns the total size of elements in the queue */ |
| static RD_INLINE RD_UNUSED |
| uint64_t rd_kafka_q_size (rd_kafka_q_t *rkq) { |
| uint64_t sz; |
| rd_kafka_q_t *fwdq; |
| mtx_lock(&rkq->rkq_lock); |
| if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| sz = rkq->rkq_qsize; |
| mtx_unlock(&rkq->rkq_lock); |
| } else { |
| mtx_unlock(&rkq->rkq_lock); |
| sz = rd_kafka_q_size(fwdq); |
| rd_kafka_q_destroy(fwdq); |
| } |
| return sz; |
| } |
| |
| |
| /* Construct temporary on-stack replyq with increased Q refcount and |
| * optional VERSION. */ |
| #if ENABLE_DEVEL |
| #define RD_KAFKA_REPLYQ(Q,VERSION) \ |
| (rd_kafka_replyq_t){rd_kafka_q_keep(Q), VERSION, \ |
| rd_strdup(__FUNCTION__) } |
| #else |
| #define RD_KAFKA_REPLYQ(Q,VERSION) \ |
| (rd_kafka_replyq_t){rd_kafka_q_keep(Q), VERSION} |
| #endif |
| |
| /* Construct temporary on-stack replyq for indicating no replyq. */ |
| #if ENABLE_DEVEL |
| #define RD_KAFKA_NO_REPLYQ (rd_kafka_replyq_t){NULL, 0, NULL} |
| #else |
| #define RD_KAFKA_NO_REPLYQ (rd_kafka_replyq_t){NULL, 0} |
| #endif |
| |
| /** |
| * Set up replyq. |
| * Q refcnt is increased. |
| */ |
| static RD_INLINE RD_UNUSED void |
| rd_kafka_set_replyq (rd_kafka_replyq_t *replyq, |
| rd_kafka_q_t *rkq, int32_t version) { |
| replyq->q = rkq ? rd_kafka_q_keep(rkq) : NULL; |
| replyq->version = version; |
| #if ENABLE_DEVEL |
| replyq->_id = strdup(__FUNCTION__); |
| #endif |
| } |
| |
| /** |
| * Set rko's replyq with an optional version (versionptr != NULL). |
| * Q refcnt is increased. |
| */ |
| static RD_INLINE RD_UNUSED void |
| rd_kafka_op_set_replyq (rd_kafka_op_t *rko, rd_kafka_q_t *rkq, |
| rd_atomic32_t *versionptr) { |
| rd_kafka_set_replyq(&rko->rko_replyq, rkq, |
| versionptr ? rd_atomic32_get(versionptr) : 0); |
| } |
| |
| /* Set reply rko's version from replyq's version */ |
| #define rd_kafka_op_get_reply_version(REPLY_RKO, ORIG_RKO) do { \ |
| (REPLY_RKO)->rko_version = (ORIG_RKO)->rko_replyq.version; \ |
| } while (0) |
| |
| |
| /* Clear replyq holder without decreasing any .q references. */ |
| static RD_INLINE RD_UNUSED void |
| rd_kafka_replyq_clear (rd_kafka_replyq_t *replyq) { |
| memset(replyq, 0, sizeof(*replyq)); |
| } |
| |
| /** |
| * @brief Make a copy of \p src in \p dst, with its own queue reference |
| */ |
| static RD_INLINE RD_UNUSED void |
| rd_kafka_replyq_copy (rd_kafka_replyq_t *dst, rd_kafka_replyq_t *src) { |
| dst->version = src->version; |
| dst->q = src->q; |
| if (dst->q) |
| rd_kafka_q_keep(dst->q); |
| #if ENABLE_DEVEL |
| if (src->_id) |
| dst->_id = rd_strdup(src->_id); |
| else |
| dst->_id = NULL; |
| #endif |
| } |
| |
| |
| /** |
| * Clear replyq holder and destroy any .q references. |
| */ |
| static RD_INLINE RD_UNUSED void |
| rd_kafka_replyq_destroy (rd_kafka_replyq_t *replyq) { |
| if (replyq->q) |
| rd_kafka_q_destroy(replyq->q); |
| #if ENABLE_DEVEL |
| if (replyq->_id) { |
| rd_free(replyq->_id); |
| replyq->_id = NULL; |
| } |
| #endif |
| rd_kafka_replyq_clear(replyq); |
| } |
| |
| |
| /** |
| * @brief Wrapper for rd_kafka_q_enq() that takes a replyq, |
| * steals its queue reference, enqueues the op with the replyq version, |
| * and then destroys the queue reference. |
| * |
| * If \p version is non-zero it will be updated, else replyq->version. |
| * |
| * @returns Same as rd_kafka_q_enq() |
| */ |
| static RD_INLINE RD_UNUSED int |
| rd_kafka_replyq_enq (rd_kafka_replyq_t *replyq, rd_kafka_op_t *rko, |
| int version) { |
| rd_kafka_q_t *rkq = replyq->q; |
| int r; |
| |
| if (version) |
| rko->rko_version = version; |
| else |
| rko->rko_version = replyq->version; |
| |
| /* The replyq queue reference is done after we've enqueued the rko |
| * so clear it here. */ |
| replyq->q = NULL; |
| |
| #if ENABLE_DEVEL |
| if (replyq->_id) { |
| rd_free(replyq->_id); |
| replyq->_id = NULL; |
| } |
| #endif |
| |
| /* Retain replyq->version since it is used by buf_callback |
| * when dispatching the callback. */ |
| |
| r = rd_kafka_q_enq(rkq, rko); |
| |
| rd_kafka_q_destroy(rkq); |
| |
| return r; |
| } |
| |
| |
| |
| rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms, |
| int32_t version, |
| rd_kafka_q_cb_type_t cb_type, |
| rd_kafka_q_serve_cb_t *callback, |
| void *opaque); |
| rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms, |
| int32_t version); |
| int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt, |
| rd_kafka_q_cb_type_t cb_type, |
| rd_kafka_q_serve_cb_t *callback, |
| void *opaque); |
| |
| int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock); |
| #define rd_kafka_q_purge(rkq) rd_kafka_q_purge0(rkq, 1/*lock*/) |
| void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq, |
| rd_kafka_toppar_t *rktp, int version); |
| |
| int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq, |
| int cnt, int do_locks); |
| |
| int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, |
| rd_kafka_message_t **rkmessages, |
| size_t rkmessages_size); |
| rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms); |
| |
| int rd_kafka_q_apply (rd_kafka_q_t *rkq, |
| int (*callback) (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
| void *opaque), |
| void *opaque); |
| |
| void rd_kafka_q_fix_offsets (rd_kafka_q_t *rkq, int64_t min_offset, |
| int64_t base_offset); |
| |
| /** |
| * @returns the last op in the queue matching \p op_type and \p allow_err (bool) |
| * @remark The \p rkq must be properly locked before this call, the returned rko |
| * is not removed from the queue and may thus not be held for longer |
| * than the lock is held. |
| */ |
| static RD_INLINE RD_UNUSED |
| rd_kafka_op_t *rd_kafka_q_last (rd_kafka_q_t *rkq, rd_kafka_op_type_t op_type, |
| int allow_err) { |
| rd_kafka_op_t *rko; |
| TAILQ_FOREACH_REVERSE(rko, &rkq->rkq_q, rd_kafka_op_tailq, rko_link) { |
| if (rko->rko_type == op_type && |
| (allow_err || !rko->rko_err)) |
| return rko; |
| } |
| |
| return NULL; |
| } |
| |
| void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd, |
| const void *payload, size_t size); |
| |
| /* Public interface */ |
| struct rd_kafka_queue_s { |
| rd_kafka_q_t *rkqu_q; |
| rd_kafka_t *rkqu_rk; |
| }; |
| |
| |
| void rd_kafka_q_dump (FILE *fp, rd_kafka_q_t *rkq); |
| |
| extern int RD_TLS rd_kafka_yield_thread; |