| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2012-2015, Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| #pragma once |
| |
| |
| #include "rdkafka_msg.h" |
| |
| /* Forward declarations */ |
| typedef struct rd_kafka_q_s rd_kafka_q_t; |
| typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; |
| typedef struct rd_kafka_op_s rd_kafka_op_t; |
| |
| /* One-off reply queue + reply version. |
| * All APIs that take a rd_kafka_replyq_t makes a copy of the |
| * struct as-is and grabs hold of the existing .q refcount. |
| * Think of replyq as a (Q,VERSION) tuple. */ |
| typedef struct rd_kafka_replyq_s { |
| rd_kafka_q_t *q; |
| int32_t version; |
| #if ENABLE_DEVEL |
| char *_id; /* Devel id used for debugging reference leaks. |
| * Is a strdup() of the caller's function name, |
| * which makes for easy debugging with valgrind. */ |
| #endif |
| } rd_kafka_replyq_t; |
| |
| |
| |
| |
| /** |
| * Flags used by: |
| * - rd_kafka_op_t.rko_flags |
| * - rd_kafka_buf_t.rkbuf_flags |
| */ |
| #define RD_KAFKA_OP_F_FREE 0x1 /* rd_free payload when done with it */ |
| #define RD_KAFKA_OP_F_FLASH 0x2 /* Internal: insert at head of queue */ |
| #define RD_KAFKA_OP_F_NO_RESPONSE 0x4 /* rkbuf: Not expecting a response */ |
| #define RD_KAFKA_OP_F_CRC 0x8 /* rkbuf: Perform CRC calculation */ |
| #define RD_KAFKA_OP_F_BLOCKING 0x10 /* rkbuf: blocking protocol request */ |
| #define RD_KAFKA_OP_F_REPROCESS 0x20 /* cgrp: Reprocess at a later time. */ |
| |
| |
| typedef enum { |
| RD_KAFKA_OP_NONE, /* No specific type, use OP_CB */ |
| RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */ |
| RD_KAFKA_OP_ERR, /* Kafka thread -> Application */ |
| RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */ |
| RD_KAFKA_OP_DR, /* Kafka thread -> Application |
| * Produce message delivery report */ |
| RD_KAFKA_OP_STATS, /* Kafka thread -> Application */ |
| |
| RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */ |
| RD_KAFKA_OP_NODE_UPDATE, /* any -> Broker thread: node update */ |
| |
| RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */ |
| RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */ |
| RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */ |
| RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */ |
| RD_KAFKA_OP_FETCH_STOP, /* Application -> toppar's handler thread */ |
| RD_KAFKA_OP_SEEK, /* Application -> toppar's handler thread */ |
| RD_KAFKA_OP_PAUSE, /* Application -> toppar's handler thread */ |
| RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets |
| * for topic. */ |
| |
| RD_KAFKA_OP_PARTITION_JOIN, /* * -> cgrp op: add toppar to cgrp |
| * * -> broker op: add toppar to broker */ |
| RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op: remove toppar from cgrp |
| * * -> broker op: remove toppar from rkb*/ |
| RD_KAFKA_OP_REBALANCE, /* broker thread -> app: |
| * group rebalance */ |
| RD_KAFKA_OP_TERMINATE, /* For generic use */ |
| RD_KAFKA_OP_COORD_QUERY, /* Query for coordinator */ |
| RD_KAFKA_OP_SUBSCRIBE, /* New subscription */ |
| RD_KAFKA_OP_ASSIGN, /* New assignment */ |
| RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription. |
| * Reuses u.subscribe */ |
| RD_KAFKA_OP_GET_ASSIGNMENT, /* Get current assignment. |
| * Reuses u.assign */ |
| RD_KAFKA_OP_THROTTLE, /* Throttle info */ |
| RD_KAFKA_OP_NAME, /* Request name */ |
| RD_KAFKA_OP_OFFSET_RESET, /* Offset reset */ |
| RD_KAFKA_OP_METADATA, /* Metadata response */ |
| RD_KAFKA_OP_LOG, /* Log */ |
| RD_KAFKA_OP_WAKEUP, /* Wake-up signaling */ |
| RD_KAFKA_OP__END |
| } rd_kafka_op_type_t; |
| |
| /* Flags used with op_type_t */ |
| #define RD_KAFKA_OP_CB (1 << 30) /* Callback op. */ |
| #define RD_KAFKA_OP_REPLY (1 << 31) /* Reply op. */ |
| #define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY) |
| |
| |
| /** |
| * @brief Op/queue priority levels. |
| * @remark Since priority levels alter the FIFO order, pay extra attention |
| * to preserve ordering as deemed necessary. |
| * @remark Priority should only be set on ops destined for application |
| * facing queues (rk_rep, rkcg_q, etc). |
| */ |
| typedef enum { |
| RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */ |
| RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk, |
| * still at some scale. e.g. logs, .. */ |
| RD_KAFKA_PRIO_HIGH, /* Small scale high priority */ |
| RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */ |
| } rd_kafka_op_prio_t; |
| |
| |
| /** |
| * @brief Op handler result |
| * |
| * @remark When returning YIELD from a handler the handler will |
| * need to have made sure to either re-enqueue the op or destroy it |
| * since the caller will not touch the op anymore. |
| */ |
| typedef enum { |
| RD_KAFKA_OP_RES_PASS, /* Not handled, pass to caller */ |
| RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */ |
| RD_KAFKA_OP_RES_YIELD /* Callback called yield */ |
| } rd_kafka_op_res_t; |
| |
| |
| /** |
| * @brief Queue serve callback call type |
| */ |
| typedef enum { |
| RD_KAFKA_Q_CB_INVALID, /* dont use */ |
| RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */ |
| RD_KAFKA_Q_CB_RETURN, /* return op rather than trigger callback |
| * (if possible)*/ |
| RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */ |
| RD_KAFKA_Q_CB_EVENT /* like _Q_CB_RETURN but return event_t:ed op */ |
| } rd_kafka_q_cb_type_t; |
| |
| /** |
| * @brief Queue serve callback |
| * @remark See rd_kafka_op_res_t docs for return semantics. |
| */ |
| typedef rd_kafka_op_res_t |
| (rd_kafka_q_serve_cb_t) (rd_kafka_t *rk, |
| struct rd_kafka_q_s *rkq, |
| struct rd_kafka_op_s *rko, |
| rd_kafka_q_cb_type_t cb_type, void *opaque) |
| RD_WARN_UNUSED_RESULT; |
| |
| /** |
| * @brief Op callback type |
| */ |
| typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk, |
| rd_kafka_q_t *rkq, |
| struct rd_kafka_op_s *rko) |
| RD_WARN_UNUSED_RESULT; |
| |
| |
| #define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \ |
| rd_kafka_assert(NULL, (rko)->rko_type == (type) && # type) |
| |
| struct rd_kafka_op_s { |
| TAILQ_ENTRY(rd_kafka_op_s) rko_link; |
| |
| rd_kafka_op_type_t rko_type; /* Internal op type */ |
| rd_kafka_event_type_t rko_evtype; |
| int rko_flags; /* See RD_KAFKA_OP_F_... above */ |
| int32_t rko_version; |
| rd_kafka_resp_err_t rko_err; |
| int32_t rko_len; /* Depends on type, typically the |
| * message length. */ |
| rd_kafka_op_prio_t rko_prio; /* In-queue priority. |
| * Higher value means higher prio. */ |
| |
| shptr_rd_kafka_toppar_t *rko_rktp; |
| |
| /* |
| * Generic fields |
| */ |
| |
| /* Indicates request: enqueue reply on rko_replyq.q with .version. |
| * .q is refcounted. */ |
| rd_kafka_replyq_t rko_replyq; |
| |
| /* Original queue's op serve callback and opaque, if any. |
| * Mainly used for forwarded queues to use the original queue's |
| * serve function from the forwarded position. */ |
| rd_kafka_q_serve_cb_t *rko_serve; |
| void *rko_serve_opaque; |
| |
| rd_kafka_t *rko_rk; |
| |
| #if ENABLE_DEVEL |
| const char *rko_source; /**< Where op was created */ |
| #endif |
| |
| /* RD_KAFKA_OP_CB */ |
| rd_kafka_op_cb_t *rko_op_cb; |
| |
| union { |
| struct { |
| rd_kafka_buf_t *rkbuf; |
| rd_kafka_msg_t rkm; |
| int evidx; |
| } fetch; |
| |
| struct { |
| rd_kafka_topic_partition_list_t *partitions; |
| int do_free; /* free .partitions on destroy() */ |
| } offset_fetch; |
| |
| struct { |
| rd_kafka_topic_partition_list_t *partitions; |
| void (*cb) (rd_kafka_t *rk, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *offsets, |
| void *opaque); |
| void *opaque; |
| int silent_empty; /**< Fail silently if there are no |
| * offsets to commit. */ |
| rd_ts_t ts_timeout; |
| char *reason; |
| } offset_commit; |
| |
| struct { |
| rd_kafka_topic_partition_list_t *topics; |
| } subscribe; /* also used for GET_SUBSCRIPTION */ |
| |
| struct { |
| rd_kafka_topic_partition_list_t *partitions; |
| } assign; /* also used for GET_ASSIGNMENT */ |
| |
| struct { |
| rd_kafka_topic_partition_list_t *partitions; |
| } rebalance; |
| |
| struct { |
| char *str; |
| } name; |
| |
| struct { |
| int64_t offset; |
| char *errstr; |
| rd_kafka_msg_t rkm; |
| } err; /* used for ERR and CONSUMER_ERR */ |
| |
| struct { |
| int throttle_time; |
| int32_t nodeid; |
| char *nodename; |
| } throttle; |
| |
| struct { |
| char *json; |
| size_t json_len; |
| } stats; |
| |
| struct { |
| rd_kafka_buf_t *rkbuf; |
| } xbuf; /* XMIT_BUF and RECV_BUF */ |
| |
| /* RD_KAFKA_OP_METADATA */ |
| struct { |
| rd_kafka_metadata_t *md; |
| int force; /* force request regardless of outstanding |
| * metadata requests. */ |
| } metadata; |
| |
| struct { |
| shptr_rd_kafka_itopic_t *s_rkt; |
| rd_kafka_msgq_t msgq; |
| rd_kafka_msgq_t msgq2; |
| int do_purge2; |
| } dr; |
| |
| struct { |
| int32_t nodeid; |
| char nodename[RD_KAFKA_NODENAME_SIZE]; |
| } node; |
| |
| struct { |
| int64_t offset; |
| char *reason; |
| } offset_reset; |
| |
| struct { |
| int64_t offset; |
| struct rd_kafka_cgrp_s *rkcg; |
| } fetch_start; /* reused for SEEK */ |
| |
| struct { |
| int pause; |
| int flag; |
| } pause; |
| |
| struct { |
| char fac[64]; |
| int level; |
| char *str; |
| } log; |
| } rko_u; |
| }; |
| |
| TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s); |
| |
| |
| |
| |
| const char *rd_kafka_op2str (rd_kafka_op_type_t type); |
| void rd_kafka_op_destroy (rd_kafka_op_t *rko); |
| rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type); |
| #if ENABLE_DEVEL |
| #define _STRINGIFYX(A) #A |
| #define _STRINGIFY(A) _STRINGIFYX(A) |
| #define rd_kafka_op_new(type) \ |
| rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type) |
| #else |
| #define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type) |
| #endif |
| rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig, |
| rd_kafka_resp_err_t err); |
| rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk, |
| rd_kafka_op_type_t type, |
| rd_kafka_op_cb_t *cb); |
| int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err); |
| |
| #define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio) |
| |
| |
| #define rd_kafka_op_err(rk,err,...) do { \ |
| if (!(rk)->rk_conf.error_cb) { \ |
| rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \ |
| break; \ |
| } \ |
| rd_kafka_q_op_err((rk)->rk_rep, RD_KAFKA_OP_ERR, err, 0, \ |
| NULL, 0, __VA_ARGS__); \ |
| } while (0) |
| |
| void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype, |
| rd_kafka_resp_err_t err, int32_t version, |
| rd_kafka_toppar_t *rktp, int64_t offset, |
| const char *fmt, ...); |
| rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq, |
| rd_kafka_op_t *rko, |
| int timeout_ms); |
| rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type); |
| rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko); |
| |
| rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, |
| rd_kafka_q_t *rkq, rd_kafka_op_t *rko) |
| RD_WARN_UNUSED_RESULT; |
| |
| rd_kafka_op_t * |
| rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp, |
| rd_kafka_toppar_t *rktp, |
| int32_t version, |
| rd_kafka_buf_t *rkbuf, |
| int64_t offset, |
| size_t key_len, const void *key, |
| size_t val_len, const void *val); |
| |
| void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb, |
| rd_kafka_q_t *rkq, |
| int throttle_time); |
| |
| |
| rd_kafka_op_res_t |
| rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, |
| rd_kafka_q_cb_type_t cb_type, void *opaque, |
| rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT; |
| |
| |
| extern rd_atomic32_t rd_kafka_op_cnt; |
| |
| void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko); |
| |
| void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko, |
| const rd_kafka_message_t *rkmessage); |