blob: f0af481a32ae2e3c0445d1f05d86a3fac6056da8 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include "rdkafka_msg.h"
/* Forward declarations */
typedef struct rd_kafka_q_s rd_kafka_q_t;
typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
typedef struct rd_kafka_op_s rd_kafka_op_t;
/* One-off reply queue + reply version.
* All APIs that take a rd_kafka_replyq_t makes a copy of the
* struct as-is and grabs hold of the existing .q refcount.
* Think of replyq as a (Q,VERSION) tuple. */
typedef struct rd_kafka_replyq_s {
rd_kafka_q_t *q;
int32_t version;
#if ENABLE_DEVEL
char *_id; /* Devel id used for debugging reference leaks.
* Is a strdup() of the caller's function name,
* which makes for easy debugging with valgrind. */
#endif
} rd_kafka_replyq_t;
/**
* Flags used by:
* - rd_kafka_op_t.rko_flags
* - rd_kafka_buf_t.rkbuf_flags
*/
#define RD_KAFKA_OP_F_FREE 0x1 /* rd_free payload when done with it */
#define RD_KAFKA_OP_F_FLASH 0x2 /* Internal: insert at head of queue */
#define RD_KAFKA_OP_F_NO_RESPONSE 0x4 /* rkbuf: Not expecting a response */
#define RD_KAFKA_OP_F_CRC 0x8 /* rkbuf: Perform CRC calculation */
#define RD_KAFKA_OP_F_BLOCKING 0x10 /* rkbuf: blocking protocol request */
#define RD_KAFKA_OP_F_REPROCESS 0x20 /* cgrp: Reprocess at a later time. */
typedef enum {
RD_KAFKA_OP_NONE, /* No specific type, use OP_CB */
RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */
RD_KAFKA_OP_ERR, /* Kafka thread -> Application */
RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */
RD_KAFKA_OP_DR, /* Kafka thread -> Application
* Produce message delivery report */
RD_KAFKA_OP_STATS, /* Kafka thread -> Application */
RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */
RD_KAFKA_OP_NODE_UPDATE, /* any -> Broker thread: node update */
RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */
RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */
RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */
RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */
RD_KAFKA_OP_FETCH_STOP, /* Application -> toppar's handler thread */
RD_KAFKA_OP_SEEK, /* Application -> toppar's handler thread */
RD_KAFKA_OP_PAUSE, /* Application -> toppar's handler thread */
RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets
* for topic. */
RD_KAFKA_OP_PARTITION_JOIN, /* * -> cgrp op: add toppar to cgrp
* * -> broker op: add toppar to broker */
RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op: remove toppar from cgrp
* * -> broker op: remove toppar from rkb*/
RD_KAFKA_OP_REBALANCE, /* broker thread -> app:
* group rebalance */
RD_KAFKA_OP_TERMINATE, /* For generic use */
RD_KAFKA_OP_COORD_QUERY, /* Query for coordinator */
RD_KAFKA_OP_SUBSCRIBE, /* New subscription */
RD_KAFKA_OP_ASSIGN, /* New assignment */
RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription.
* Reuses u.subscribe */
RD_KAFKA_OP_GET_ASSIGNMENT, /* Get current assignment.
* Reuses u.assign */
RD_KAFKA_OP_THROTTLE, /* Throttle info */
RD_KAFKA_OP_NAME, /* Request name */
RD_KAFKA_OP_OFFSET_RESET, /* Offset reset */
RD_KAFKA_OP_METADATA, /* Metadata response */
RD_KAFKA_OP_LOG, /* Log */
RD_KAFKA_OP_WAKEUP, /* Wake-up signaling */
RD_KAFKA_OP__END
} rd_kafka_op_type_t;
/* Flags used with op_type_t */
#define RD_KAFKA_OP_CB (1 << 30) /* Callback op. */
#define RD_KAFKA_OP_REPLY (1 << 31) /* Reply op. */
#define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY)
/**
* @brief Op/queue priority levels.
* @remark Since priority levels alter the FIFO order, pay extra attention
* to preserve ordering as deemed necessary.
* @remark Priority should only be set on ops destined for application
* facing queues (rk_rep, rkcg_q, etc).
*/
typedef enum {
RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */
RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk,
* still at some scale. e.g. logs, .. */
RD_KAFKA_PRIO_HIGH, /* Small scale high priority */
RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */
} rd_kafka_op_prio_t;
/**
* @brief Op handler result
*
* @remark When returning YIELD from a handler the handler will
* need to have made sure to either re-enqueue the op or destroy it
* since the caller will not touch the op anymore.
*/
typedef enum {
RD_KAFKA_OP_RES_PASS, /* Not handled, pass to caller */
RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */
RD_KAFKA_OP_RES_YIELD /* Callback called yield */
} rd_kafka_op_res_t;
/**
* @brief Queue serve callback call type
*/
typedef enum {
RD_KAFKA_Q_CB_INVALID, /* dont use */
RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */
RD_KAFKA_Q_CB_RETURN, /* return op rather than trigger callback
* (if possible)*/
RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */
RD_KAFKA_Q_CB_EVENT /* like _Q_CB_RETURN but return event_t:ed op */
} rd_kafka_q_cb_type_t;
/**
* @brief Queue serve callback
* @remark See rd_kafka_op_res_t docs for return semantics.
*/
typedef rd_kafka_op_res_t
(rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,
struct rd_kafka_q_s *rkq,
struct rd_kafka_op_s *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque)
RD_WARN_UNUSED_RESULT;
/**
* @brief Op callback type
*/
typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,
rd_kafka_q_t *rkq,
struct rd_kafka_op_s *rko)
RD_WARN_UNUSED_RESULT;
#define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \
rd_kafka_assert(NULL, (rko)->rko_type == (type) && # type)
struct rd_kafka_op_s {
TAILQ_ENTRY(rd_kafka_op_s) rko_link;
rd_kafka_op_type_t rko_type; /* Internal op type */
rd_kafka_event_type_t rko_evtype;
int rko_flags; /* See RD_KAFKA_OP_F_... above */
int32_t rko_version;
rd_kafka_resp_err_t rko_err;
int32_t rko_len; /* Depends on type, typically the
* message length. */
rd_kafka_op_prio_t rko_prio; /* In-queue priority.
* Higher value means higher prio. */
shptr_rd_kafka_toppar_t *rko_rktp;
/*
* Generic fields
*/
/* Indicates request: enqueue reply on rko_replyq.q with .version.
* .q is refcounted. */
rd_kafka_replyq_t rko_replyq;
/* Original queue's op serve callback and opaque, if any.
* Mainly used for forwarded queues to use the original queue's
* serve function from the forwarded position. */
rd_kafka_q_serve_cb_t *rko_serve;
void *rko_serve_opaque;
rd_kafka_t *rko_rk;
#if ENABLE_DEVEL
const char *rko_source; /**< Where op was created */
#endif
/* RD_KAFKA_OP_CB */
rd_kafka_op_cb_t *rko_op_cb;
union {
struct {
rd_kafka_buf_t *rkbuf;
rd_kafka_msg_t rkm;
int evidx;
} fetch;
struct {
rd_kafka_topic_partition_list_t *partitions;
int do_free; /* free .partitions on destroy() */
} offset_fetch;
struct {
rd_kafka_topic_partition_list_t *partitions;
void (*cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque);
void *opaque;
int silent_empty; /**< Fail silently if there are no
* offsets to commit. */
rd_ts_t ts_timeout;
char *reason;
} offset_commit;
struct {
rd_kafka_topic_partition_list_t *topics;
} subscribe; /* also used for GET_SUBSCRIPTION */
struct {
rd_kafka_topic_partition_list_t *partitions;
} assign; /* also used for GET_ASSIGNMENT */
struct {
rd_kafka_topic_partition_list_t *partitions;
} rebalance;
struct {
char *str;
} name;
struct {
int64_t offset;
char *errstr;
rd_kafka_msg_t rkm;
} err; /* used for ERR and CONSUMER_ERR */
struct {
int throttle_time;
int32_t nodeid;
char *nodename;
} throttle;
struct {
char *json;
size_t json_len;
} stats;
struct {
rd_kafka_buf_t *rkbuf;
} xbuf; /* XMIT_BUF and RECV_BUF */
/* RD_KAFKA_OP_METADATA */
struct {
rd_kafka_metadata_t *md;
int force; /* force request regardless of outstanding
* metadata requests. */
} metadata;
struct {
shptr_rd_kafka_itopic_t *s_rkt;
rd_kafka_msgq_t msgq;
rd_kafka_msgq_t msgq2;
int do_purge2;
} dr;
struct {
int32_t nodeid;
char nodename[RD_KAFKA_NODENAME_SIZE];
} node;
struct {
int64_t offset;
char *reason;
} offset_reset;
struct {
int64_t offset;
struct rd_kafka_cgrp_s *rkcg;
} fetch_start; /* reused for SEEK */
struct {
int pause;
int flag;
} pause;
struct {
char fac[64];
int level;
char *str;
} log;
} rko_u;
};
TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s);
const char *rd_kafka_op2str (rd_kafka_op_type_t type);
void rd_kafka_op_destroy (rd_kafka_op_t *rko);
rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);
#if ENABLE_DEVEL
#define _STRINGIFYX(A) #A
#define _STRINGIFY(A) _STRINGIFYX(A)
#define rd_kafka_op_new(type) \
rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type)
#else
#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)
#endif
rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
rd_kafka_resp_err_t err);
rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
rd_kafka_op_type_t type,
rd_kafka_op_cb_t *cb);
int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
#define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio)
#define rd_kafka_op_err(rk,err,...) do { \
if (!(rk)->rk_conf.error_cb) { \
rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \
break; \
} \
rd_kafka_q_op_err((rk)->rk_rep, RD_KAFKA_OP_ERR, err, 0, \
NULL, 0, __VA_ARGS__); \
} while (0)
void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype,
rd_kafka_resp_err_t err, int32_t version,
rd_kafka_toppar_t *rktp, int64_t offset,
const char *fmt, ...);
rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
rd_kafka_op_t *rko,
int timeout_ms);
rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type);
rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko);
rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,
rd_kafka_q_t *rkq, rd_kafka_op_t *rko)
RD_WARN_UNUSED_RESULT;
rd_kafka_op_t *
rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_buf_t *rkbuf,
int64_t offset,
size_t key_len, const void *key,
size_t val_len, const void *val);
void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb,
rd_kafka_q_t *rkq,
int throttle_time);
rd_kafka_op_res_t
rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque,
rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT;
extern rd_atomic32_t rd_kafka_op_cnt;
void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko);
void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko,
const rd_kafka_message_t *rkmessage);