| /* |
| * librdkafka - The Apache Kafka C/C++ library |
| * |
| * Copyright (c) 2016 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_offset.h" |
| #include "rdkafka_topic.h" |
| #include "rdkafka_interceptor.h" |
| |
| int RD_TLS rd_kafka_yield_thread = 0; |
| |
| void rd_kafka_yield (rd_kafka_t *rk) { |
| rd_kafka_yield_thread = 1; |
| } |
| |
| |
| /** |
| * Destroy a queue. refcnt must be at zero. |
| */ |
| void rd_kafka_q_destroy_final (rd_kafka_q_t *rkq) { |
| |
| mtx_lock(&rkq->rkq_lock); |
| if (unlikely(rkq->rkq_qio != NULL)) { |
| rd_free(rkq->rkq_qio); |
| rkq->rkq_qio = NULL; |
| } |
| rd_kafka_q_fwd_set0(rkq, NULL, 0/*no-lock*/, 0 /*no-fwd-app*/); |
| rd_kafka_q_disable0(rkq, 0/*no-lock*/); |
| rd_kafka_q_purge0(rkq, 0/*no-lock*/); |
| assert(!rkq->rkq_fwdq); |
| mtx_unlock(&rkq->rkq_lock); |
| mtx_destroy(&rkq->rkq_lock); |
| cnd_destroy(&rkq->rkq_cond); |
| |
| if (rkq->rkq_flags & RD_KAFKA_Q_F_ALLOCATED) |
| rd_free(rkq); |
| } |
| |
| |
| |
| /** |
| * Initialize a queue. |
| */ |
| void rd_kafka_q_init (rd_kafka_q_t *rkq, rd_kafka_t *rk) { |
| rd_kafka_q_reset(rkq); |
| rkq->rkq_fwdq = NULL; |
| rkq->rkq_refcnt = 1; |
| rkq->rkq_flags = RD_KAFKA_Q_F_READY; |
| rkq->rkq_rk = rk; |
| rkq->rkq_qio = NULL; |
| rkq->rkq_serve = NULL; |
| rkq->rkq_opaque = NULL; |
| mtx_init(&rkq->rkq_lock, mtx_plain); |
| cnd_init(&rkq->rkq_cond); |
| } |
| |
| |
| /** |
| * Allocate a new queue and initialize it. |
| */ |
| rd_kafka_q_t *rd_kafka_q_new0 (rd_kafka_t *rk, const char *func, int line) { |
| rd_kafka_q_t *rkq = rd_malloc(sizeof(*rkq)); |
| rd_kafka_q_init(rkq, rk); |
| rkq->rkq_flags |= RD_KAFKA_Q_F_ALLOCATED; |
| #if ENABLE_DEVEL |
| rd_snprintf(rkq->rkq_name, sizeof(rkq->rkq_name), "%s:%d", func, line); |
| #else |
| rkq->rkq_name = func; |
| #endif |
| return rkq; |
| } |
| |
| /** |
| * Set/clear forward queue. |
| * Queue forwarding enables message routing inside rdkafka. |
| * Typical use is to re-route all fetched messages for all partitions |
| * to one single queue. |
| * |
| * All access to rkq_fwdq are protected by rkq_lock. |
| */ |
| void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq, |
| int do_lock, int fwd_app) { |
| |
| if (do_lock) |
| mtx_lock(&srcq->rkq_lock); |
| if (fwd_app) |
| srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP; |
| if (srcq->rkq_fwdq) { |
| rd_kafka_q_destroy(srcq->rkq_fwdq); |
| srcq->rkq_fwdq = NULL; |
| } |
| if (destq) { |
| rd_kafka_q_keep(destq); |
| |
| /* If rkq has ops in queue, append them to fwdq's queue. |
| * This is an irreversible operation. */ |
| if (srcq->rkq_qlen > 0) { |
| rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY); |
| rd_kafka_q_concat(destq, srcq); |
| } |
| |
| srcq->rkq_fwdq = destq; |
| } |
| if (do_lock) |
| mtx_unlock(&srcq->rkq_lock); |
| } |
| |
| /** |
| * Purge all entries from a queue. |
| */ |
| int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) { |
| rd_kafka_op_t *rko, *next; |
| TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); |
| rd_kafka_q_t *fwdq; |
| int cnt = 0; |
| |
| if (do_lock) |
| mtx_lock(&rkq->rkq_lock); |
| |
| if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| cnt = rd_kafka_q_purge(fwdq); |
| rd_kafka_q_destroy(fwdq); |
| return cnt; |
| } |
| |
| /* Move ops queue to tmpq to avoid lock-order issue |
| * by locks taken from rd_kafka_op_destroy(). */ |
| TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link); |
| |
| /* Zero out queue */ |
| rd_kafka_q_reset(rkq); |
| |
| if (do_lock) |
| mtx_unlock(&rkq->rkq_lock); |
| |
| /* Destroy the ops */ |
| next = TAILQ_FIRST(&tmpq); |
| while ((rko = next)) { |
| next = TAILQ_NEXT(next, rko_link); |
| rd_kafka_op_destroy(rko); |
| cnt++; |
| } |
| |
| return cnt; |
| } |
| |
| |
| /** |
| * Purge all entries from a queue with a rktp version smaller than `version` |
| * This shaves off the head of the queue, up until the first rko with |
| * a non-matching rktp or version. |
| */ |
| void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq, |
| rd_kafka_toppar_t *rktp, int version) { |
| rd_kafka_op_t *rko, *next; |
| TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); |
| int32_t cnt = 0; |
| int64_t size = 0; |
| rd_kafka_q_t *fwdq; |
| |
| mtx_lock(&rkq->rkq_lock); |
| |
| if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| mtx_unlock(&rkq->rkq_lock); |
| rd_kafka_q_purge_toppar_version(fwdq, rktp, version); |
| rd_kafka_q_destroy(fwdq); |
| return; |
| } |
| |
| /* Move ops to temporary queue and then destroy them from there |
| * without locks to avoid lock-ordering problems in op_destroy() */ |
| while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp && |
| rd_kafka_toppar_s2i(rko->rko_rktp) == rktp && |
| rko->rko_version < version) { |
| TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); |
| TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); |
| cnt++; |
| size += rko->rko_len; |
| } |
| |
| |
| rkq->rkq_qlen -= cnt; |
| rkq->rkq_qsize -= size; |
| mtx_unlock(&rkq->rkq_lock); |
| |
| next = TAILQ_FIRST(&tmpq); |
| while ((rko = next)) { |
| next = TAILQ_NEXT(next, rko_link); |
| rd_kafka_op_destroy(rko); |
| } |
| } |
| |
| |
| /** |
| * Move 'cnt' entries from 'srcq' to 'dstq'. |
| * If 'cnt' == -1 all entries will be moved. |
| * Returns the number of entries moved. |
| */ |
| int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq, |
| int cnt, int do_locks) { |
| rd_kafka_op_t *rko; |
| int mcnt = 0; |
| |
| if (do_locks) { |
| mtx_lock(&srcq->rkq_lock); |
| mtx_lock(&dstq->rkq_lock); |
| } |
| |
| if (!dstq->rkq_fwdq && !srcq->rkq_fwdq) { |
| if (cnt > 0 && dstq->rkq_qlen == 0) |
| rd_kafka_q_io_event(dstq); |
| |
| /* Optimization, if 'cnt' is equal/larger than all |
| * items of 'srcq' we can move the entire queue. */ |
| if (cnt == -1 || |
| cnt >= (int)srcq->rkq_qlen) { |
| mcnt = srcq->rkq_qlen; |
| rd_kafka_q_concat0(dstq, srcq, 0/*no-lock*/); |
| } else { |
| while (mcnt < cnt && |
| (rko = TAILQ_FIRST(&srcq->rkq_q))) { |
| TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link); |
| if (likely(!rko->rko_prio)) |
| TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, |
| rko_link); |
| else |
| TAILQ_INSERT_SORTED( |
| &dstq->rkq_q, rko, |
| rd_kafka_op_t *, rko_link, |
| rd_kafka_op_cmp_prio); |
| |
| srcq->rkq_qlen--; |
| dstq->rkq_qlen++; |
| srcq->rkq_qsize -= rko->rko_len; |
| dstq->rkq_qsize += rko->rko_len; |
| mcnt++; |
| } |
| } |
| } else |
| mcnt = rd_kafka_q_move_cnt(dstq->rkq_fwdq ? dstq->rkq_fwdq:dstq, |
| srcq->rkq_fwdq ? srcq->rkq_fwdq:srcq, |
| cnt, do_locks); |
| |
| if (do_locks) { |
| mtx_unlock(&dstq->rkq_lock); |
| mtx_unlock(&srcq->rkq_lock); |
| } |
| |
| return mcnt; |
| } |
| |
| |
| /** |
| * Filters out outdated ops. |
| */ |
| static RD_INLINE rd_kafka_op_t *rd_kafka_op_filter (rd_kafka_q_t *rkq, |
| rd_kafka_op_t *rko, |
| int version) { |
| if (unlikely(!rko)) |
| return NULL; |
| |
| if (unlikely(rd_kafka_op_version_outdated(rko, version))) { |
| rd_kafka_q_deq0(rkq, rko); |
| rd_kafka_op_destroy(rko); |
| return NULL; |
| } |
| |
| return rko; |
| } |
| |
| |
| |
| /** |
| * Pop an op from a queue. |
| * |
| * Locality: any thread. |
| */ |
| |
| |
| /** |
| * Serve q like rd_kafka_q_serve() until an op is found that can be returned |
| * as an event to the application. |
| * |
| * @returns the first event:able op, or NULL on timeout. |
| * |
| * Locality: any thread |
| */ |
| rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms, |
| int32_t version, |
| rd_kafka_q_cb_type_t cb_type, |
| rd_kafka_q_serve_cb_t *callback, |
| void *opaque) { |
| rd_kafka_op_t *rko; |
| rd_kafka_q_t *fwdq; |
| |
| rd_dassert(cb_type); |
| |
| if (timeout_ms == RD_POLL_INFINITE) |
| timeout_ms = INT_MAX; |
| |
| mtx_lock(&rkq->rkq_lock); |
| |
| rd_kafka_yield_thread = 0; |
| if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| do { |
| rd_kafka_op_res_t res; |
| rd_ts_t pre; |
| |
| /* Filter out outdated ops */ |
| retry: |
| while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && |
| !(rko = rd_kafka_op_filter(rkq, rko, version))) |
| ; |
| |
| if (rko) { |
| /* Proper versioned op */ |
| rd_kafka_q_deq0(rkq, rko); |
| |
| /* Ops with callbacks are considered handled |
| * and we move on to the next op, if any. |
| * Ops w/o callbacks are returned immediately */ |
| res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko, |
| cb_type, opaque, |
| callback); |
| if (res == RD_KAFKA_OP_RES_HANDLED) |
| goto retry; /* Next op */ |
| else if (unlikely(res == |
| RD_KAFKA_OP_RES_YIELD)) { |
| /* Callback yielded, unroll */ |
| mtx_unlock(&rkq->rkq_lock); |
| return NULL; |
| } else |
| break; /* Proper op, handle below. */ |
| } |
| |
| /* No op, wait for one */ |
| pre = rd_clock(); |
| if (cnd_timedwait_ms(&rkq->rkq_cond, |
| &rkq->rkq_lock, |
| timeout_ms) == |
| thrd_timedout) { |
| mtx_unlock(&rkq->rkq_lock); |
| return NULL; |
| } |
| /* Remove spent time */ |
| timeout_ms -= (int) (rd_clock()-pre) / 1000; |
| if (timeout_ms < 0) |
| timeout_ms = RD_POLL_NOWAIT; |
| |
| } while (timeout_ms != RD_POLL_NOWAIT); |
| |
| mtx_unlock(&rkq->rkq_lock); |
| |
| } else { |
| /* Since the q_pop may block we need to release the parent |
| * queue's lock. */ |
| mtx_unlock(&rkq->rkq_lock); |
| rko = rd_kafka_q_pop_serve(fwdq, timeout_ms, version, |
| cb_type, callback, opaque); |
| rd_kafka_q_destroy(fwdq); |
| } |
| |
| |
| return rko; |
| } |
| |
| rd_kafka_op_t *rd_kafka_q_pop (rd_kafka_q_t *rkq, int timeout_ms, |
| int32_t version) { |
| return rd_kafka_q_pop_serve(rkq, timeout_ms, version, |
| RD_KAFKA_Q_CB_RETURN, |
| NULL, NULL); |
| } |
| |
| |
| /** |
| * Pop all available ops from a queue and call the provided |
| * callback for each op. |
| * `max_cnt` limits the number of ops served, 0 = no limit. |
| * |
| * Returns the number of ops served. |
| * |
| * Locality: any thread. |
| */ |
| int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms, |
| int max_cnt, rd_kafka_q_cb_type_t cb_type, |
| rd_kafka_q_serve_cb_t *callback, void *opaque) { |
| rd_kafka_t *rk = rkq->rkq_rk; |
| rd_kafka_op_t *rko; |
| rd_kafka_q_t localq; |
| rd_kafka_q_t *fwdq; |
| int cnt = 0; |
| |
| rd_dassert(cb_type); |
| |
| mtx_lock(&rkq->rkq_lock); |
| |
| rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0); |
| if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| int ret; |
| /* Since the q_pop may block we need to release the parent |
| * queue's lock. */ |
| mtx_unlock(&rkq->rkq_lock); |
| ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt, |
| cb_type, callback, opaque); |
| rd_kafka_q_destroy(fwdq); |
| return ret; |
| } |
| |
| if (timeout_ms == RD_POLL_INFINITE) |
| timeout_ms = INT_MAX; |
| |
| /* Wait for op */ |
| while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && timeout_ms != 0) { |
| if (cnd_timedwait_ms(&rkq->rkq_cond, |
| &rkq->rkq_lock, |
| timeout_ms) != thrd_success) |
| break; |
| |
| timeout_ms = 0; |
| } |
| |
| if (!rko) { |
| mtx_unlock(&rkq->rkq_lock); |
| return 0; |
| } |
| |
| /* Move the first `max_cnt` ops. */ |
| rd_kafka_q_init(&localq, rkq->rkq_rk); |
| rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1/*all*/ : max_cnt, |
| 0/*no-locks*/); |
| |
| mtx_unlock(&rkq->rkq_lock); |
| |
| rd_kafka_yield_thread = 0; |
| |
| /* Call callback for each op */ |
| while ((rko = TAILQ_FIRST(&localq.rkq_q))) { |
| rd_kafka_op_res_t res; |
| |
| rd_kafka_q_deq0(&localq, rko); |
| res = rd_kafka_op_handle(rk, &localq, rko, cb_type, |
| opaque, callback); |
| /* op must have been handled */ |
| rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS); |
| cnt++; |
| |
| if (unlikely(res == RD_KAFKA_OP_RES_YIELD || |
| rd_kafka_yield_thread)) { |
| /* Callback called rd_kafka_yield(), we must |
| * stop our callback dispatching and put the |
| * ops in localq back on the original queue head. */ |
| if (!TAILQ_EMPTY(&localq.rkq_q)) |
| rd_kafka_q_prepend(rkq, &localq); |
| break; |
| } |
| } |
| |
| rd_kafka_q_destroy(&localq); |
| |
| return cnt; |
| } |
| |
| |
| |
| |
| |
| /** |
| * Populate 'rkmessages' array with messages from 'rkq'. |
| * If 'auto_commit' is set, each message's offset will be committed |
| * to the offset store for that toppar. |
| * |
| * Returns the number of messages added. |
| */ |
| |
| int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms, |
| rd_kafka_message_t **rkmessages, |
| size_t rkmessages_size) { |
| unsigned int cnt = 0; |
| TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); |
| rd_kafka_op_t *rko, *next; |
| rd_kafka_t *rk = rkq->rkq_rk; |
| rd_kafka_q_t *fwdq; |
| |
| mtx_lock(&rkq->rkq_lock); |
| if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| /* Since the q_pop may block we need to release the parent |
| * queue's lock. */ |
| mtx_unlock(&rkq->rkq_lock); |
| cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms, |
| rkmessages, rkmessages_size); |
| rd_kafka_q_destroy(fwdq); |
| return cnt; |
| } |
| mtx_unlock(&rkq->rkq_lock); |
| |
| rd_kafka_yield_thread = 0; |
| while (cnt < rkmessages_size) { |
| rd_kafka_op_res_t res; |
| |
| mtx_lock(&rkq->rkq_lock); |
| |
| while (!(rko = TAILQ_FIRST(&rkq->rkq_q))) { |
| if (cnd_timedwait_ms(&rkq->rkq_cond, &rkq->rkq_lock, |
| timeout_ms) == thrd_timedout) |
| break; |
| } |
| |
| if (!rko) { |
| mtx_unlock(&rkq->rkq_lock); |
| break; /* Timed out */ |
| } |
| |
| rd_kafka_q_deq0(rkq, rko); |
| |
| mtx_unlock(&rkq->rkq_lock); |
| |
| if (rd_kafka_op_version_outdated(rko, 0)) { |
| /* Outdated op, put on discard queue */ |
| TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); |
| continue; |
| } |
| |
| /* Serve non-FETCH callbacks */ |
| res = rd_kafka_poll_cb(rk, rkq, rko, |
| RD_KAFKA_Q_CB_RETURN, NULL); |
| if (res == RD_KAFKA_OP_RES_HANDLED) { |
| /* Callback served, rko is destroyed. */ |
| continue; |
| } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD || |
| rd_kafka_yield_thread)) { |
| /* Yield. */ |
| break; |
| } |
| rd_dassert(res == RD_KAFKA_OP_RES_PASS); |
| |
| /* Auto-commit offset, if enabled. */ |
| if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) { |
| rd_kafka_toppar_t *rktp; |
| rktp = rd_kafka_toppar_s2i(rko->rko_rktp); |
| rd_kafka_toppar_lock(rktp); |
| rktp->rktp_app_offset = rko->rko_u.fetch.rkm.rkm_offset+1; |
| if (rktp->rktp_cgrp && |
| rk->rk_conf.enable_auto_offset_store) |
| rd_kafka_offset_store0(rktp, |
| rktp->rktp_app_offset, |
| 0/* no lock */); |
| rd_kafka_toppar_unlock(rktp); |
| } |
| |
| /* Get rkmessage from rko and append to array. */ |
| rkmessages[cnt++] = rd_kafka_message_get(rko); |
| } |
| |
| /* Discard non-desired and already handled ops */ |
| next = TAILQ_FIRST(&tmpq); |
| while (next) { |
| rko = next; |
| next = TAILQ_NEXT(next, rko_link); |
| rd_kafka_op_destroy(rko); |
| } |
| |
| |
| return cnt; |
| } |
| |
| |
| |
| void rd_kafka_queue_destroy (rd_kafka_queue_t *rkqu) { |
| rd_kafka_q_disable(rkqu->rkqu_q); |
| rd_kafka_q_destroy(rkqu->rkqu_q); |
| rd_free(rkqu); |
| } |
| |
| rd_kafka_queue_t *rd_kafka_queue_new0 (rd_kafka_t *rk, rd_kafka_q_t *rkq) { |
| rd_kafka_queue_t *rkqu; |
| |
| rkqu = rd_calloc(1, sizeof(*rkqu)); |
| |
| rkqu->rkqu_q = rkq; |
| rd_kafka_q_keep(rkq); |
| |
| rkqu->rkqu_rk = rk; |
| |
| return rkqu; |
| } |
| |
| |
| rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk) { |
| rd_kafka_q_t *rkq; |
| rd_kafka_queue_t *rkqu; |
| |
| rkq = rd_kafka_q_new(rk); |
| rkqu = rd_kafka_queue_new0(rk, rkq); |
| rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held |
| * by queue_new0 */ |
| return rkqu; |
| } |
| |
| |
| rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk) { |
| return rd_kafka_queue_new0(rk, rk->rk_rep); |
| } |
| |
| |
| rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) { |
| if (!rk->rk_cgrp) |
| return NULL; |
| return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q); |
| } |
| |
| rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk, |
| const char *topic, |
| int32_t partition) { |
| shptr_rd_kafka_toppar_t *s_rktp; |
| rd_kafka_toppar_t *rktp; |
| rd_kafka_queue_t *result; |
| |
| if (rk->rk_type == RD_KAFKA_PRODUCER) |
| return NULL; |
| |
| s_rktp = rd_kafka_toppar_get2(rk, topic, |
| partition, |
| 0, /* no ua_on_miss */ |
| 1 /* create_on_miss */); |
| |
| if (!s_rktp) |
| return NULL; |
| |
| rktp = rd_kafka_toppar_s2i(s_rktp); |
| result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq); |
| rd_kafka_toppar_destroy(s_rktp); |
| |
| return result; |
| } |
| |
| rd_kafka_resp_err_t rd_kafka_set_log_queue (rd_kafka_t *rk, |
| rd_kafka_queue_t *rkqu) { |
| rd_kafka_q_t *rkq; |
| if (!rkqu) |
| rkq = rk->rk_rep; |
| else |
| rkq = rkqu->rkqu_q; |
| rd_kafka_q_fwd_set(rk->rk_logq, rkq); |
| return RD_KAFKA_RESP_ERR_NO_ERROR; |
| } |
| |
| void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst) { |
| rd_kafka_q_fwd_set0(src->rkqu_q, dst ? dst->rkqu_q : NULL, |
| 1, /* do_lock */ |
| 1 /* fwd_app */); |
| } |
| |
| |
| size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu) { |
| return (size_t)rd_kafka_q_len(rkqu->rkqu_q); |
| } |
| |
| /** |
| * @brief Enable or disable(fd==-1) fd-based wake-ups for queue |
| */ |
| void rd_kafka_q_io_event_enable (rd_kafka_q_t *rkq, int fd, |
| const void *payload, size_t size) { |
| struct rd_kafka_q_io *qio = NULL; |
| |
| if (fd != -1) { |
| qio = rd_malloc(sizeof(*qio) + size); |
| qio->fd = fd; |
| qio->size = size; |
| qio->payload = (void *)(qio+1); |
| memcpy(qio->payload, payload, size); |
| } |
| |
| mtx_lock(&rkq->rkq_lock); |
| if (rkq->rkq_qio) { |
| rd_free(rkq->rkq_qio); |
| rkq->rkq_qio = NULL; |
| } |
| |
| if (fd != -1) { |
| rkq->rkq_qio = qio; |
| } |
| |
| mtx_unlock(&rkq->rkq_lock); |
| |
| } |
| |
| void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd, |
| const void *payload, size_t size) { |
| rd_kafka_q_io_event_enable(rkqu->rkqu_q, fd, payload, size); |
| } |
| |
| |
| /** |
| * Helper: wait for single op on 'rkq', and return its error, |
| * or .._TIMED_OUT on timeout. |
| */ |
| rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms) { |
| rd_kafka_op_t *rko; |
| rd_kafka_resp_err_t err; |
| |
| rko = rd_kafka_q_pop(rkq, timeout_ms, 0); |
| if (!rko) |
| err = RD_KAFKA_RESP_ERR__TIMED_OUT; |
| else { |
| err = rko->rko_err; |
| rd_kafka_op_destroy(rko); |
| } |
| |
| return err; |
| } |
| |
| |
| /** |
| * Apply \p callback on each op in queue. |
| * If the callback wishes to remove the rko it must do so using |
| * using rd_kafka_op_deq0(). |
| * |
| * @returns the sum of \p callback() return values. |
| * @remark rkq will be locked, callers should take care not to |
| * interact with \p rkq through other means from the callback to avoid |
| * deadlocks. |
| */ |
| int rd_kafka_q_apply (rd_kafka_q_t *rkq, |
| int (*callback) (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
| void *opaque), |
| void *opaque) { |
| rd_kafka_op_t *rko, *next; |
| rd_kafka_q_t *fwdq; |
| int cnt = 0; |
| |
| mtx_lock(&rkq->rkq_lock); |
| if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { |
| mtx_unlock(&rkq->rkq_lock); |
| cnt = rd_kafka_q_apply(fwdq, callback, opaque); |
| rd_kafka_q_destroy(fwdq); |
| return cnt; |
| } |
| |
| next = TAILQ_FIRST(&rkq->rkq_q); |
| while ((rko = next)) { |
| next = TAILQ_NEXT(next, rko_link); |
| cnt += callback(rkq, rko, opaque); |
| } |
| mtx_unlock(&rkq->rkq_lock); |
| |
| return cnt; |
| } |
| |
| /** |
| * @brief Convert relative to absolute offsets and also purge any messages |
| * that are older than \p min_offset. |
| * @remark Error ops with ERR__NOT_IMPLEMENTED will not be purged since |
| * they are used to indicate unknnown compression codecs and compressed |
| * messagesets may have a starting offset lower than what we requested. |
| * @remark \p rkq locking is not performed (caller's responsibility) |
| * @remark Must NOT be used on fwdq. |
| */ |
| void rd_kafka_q_fix_offsets (rd_kafka_q_t *rkq, int64_t min_offset, |
| int64_t base_offset) { |
| rd_kafka_op_t *rko, *next; |
| int adj_len = 0; |
| int64_t adj_size = 0; |
| |
| rd_kafka_assert(NULL, !rkq->rkq_fwdq); |
| |
| next = TAILQ_FIRST(&rkq->rkq_q); |
| while ((rko = next)) { |
| next = TAILQ_NEXT(next, rko_link); |
| |
| if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH)) |
| continue; |
| |
| rko->rko_u.fetch.rkm.rkm_offset += base_offset; |
| |
| if (rko->rko_u.fetch.rkm.rkm_offset < min_offset && |
| rko->rko_err != RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) { |
| adj_len++; |
| adj_size += rko->rko_len; |
| TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link); |
| rd_kafka_op_destroy(rko); |
| continue; |
| } |
| } |
| |
| |
| rkq->rkq_qlen -= adj_len; |
| rkq->rkq_qsize -= adj_size; |
| } |
| |
| |
| /** |
| * @brief Print information and contents of queue |
| */ |
| void rd_kafka_q_dump (FILE *fp, rd_kafka_q_t *rkq) { |
| mtx_lock(&rkq->rkq_lock); |
| fprintf(fp, "Queue %p \"%s\" (refcnt %d, flags 0x%x, %d ops, " |
| "%"PRId64" bytes)\n", |
| rkq, rkq->rkq_name, rkq->rkq_refcnt, rkq->rkq_flags, |
| rkq->rkq_qlen, rkq->rkq_qsize); |
| |
| if (rkq->rkq_qio) |
| fprintf(fp, " QIO fd %d\n", rkq->rkq_qio->fd); |
| if (rkq->rkq_serve) |
| fprintf(fp, " Serve callback %p, opaque %p\n", |
| rkq->rkq_serve, rkq->rkq_opaque); |
| |
| if (rkq->rkq_fwdq) { |
| fprintf(fp, " Forwarded ->\n"); |
| rd_kafka_q_dump(fp, rkq->rkq_fwdq); |
| } else { |
| rd_kafka_op_t *rko; |
| |
| if (!TAILQ_EMPTY(&rkq->rkq_q)) |
| fprintf(fp, " Queued ops:\n"); |
| TAILQ_FOREACH(rko, &rkq->rkq_q, rko_link) { |
| fprintf(fp, " %p %s (v%"PRId32", flags 0x%x, " |
| "prio %d, len %"PRId32", source %s, " |
| "replyq %p)\n", |
| rko, rd_kafka_op2str(rko->rko_type), |
| rko->rko_version, rko->rko_flags, |
| rko->rko_prio, rko->rko_len, |
| #if ENABLE_DEVEL |
| rko->rko_source |
| #else |
| "-" |
| #endif |
| , |
| rko->rko_replyq.q |
| ); |
| } |
| } |
| |
| mtx_unlock(&rkq->rkq_lock); |
| } |