blob: 0da8015e71a632c47c8daa7cd32a2a77a9188951 [file] [log] [blame]
#pragma once
#include "rdlist.h"
/**
* Forward declarations
*/
struct rd_kafka_transport_s;
/**
* MessageSet compression codecs
*/
typedef enum {
RD_KAFKA_COMPRESSION_NONE,
RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
RD_KAFKA_COMPRESSION_INHERIT /* Inherit setting from global conf */
} rd_kafka_compression_t;
typedef enum {
RD_KAFKA_PROTO_PLAINTEXT,
RD_KAFKA_PROTO_SSL,
RD_KAFKA_PROTO_SASL_PLAINTEXT,
RD_KAFKA_PROTO_SASL_SSL,
RD_KAFKA_PROTO_NUM,
} rd_kafka_secproto_t;
typedef enum {
RD_KAFKA_CONFIGURED,
RD_KAFKA_LEARNED,
RD_KAFKA_INTERNAL,
} rd_kafka_confsource_t;
typedef enum {
_RK_GLOBAL = 0x1,
_RK_PRODUCER = 0x2,
_RK_CONSUMER = 0x4,
_RK_TOPIC = 0x8,
_RK_CGRP = 0x10
} rd_kafka_conf_scope_t;
typedef enum {
_RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */
_RK_CONF_PROP_SET_ADD, /* Add value (S2F) */
_RK_CONF_PROP_SET_DEL /* Remove value (S2F) */
} rd_kafka_conf_set_mode_t;
typedef enum {
RD_KAFKA_OFFSET_METHOD_NONE,
RD_KAFKA_OFFSET_METHOD_FILE,
RD_KAFKA_OFFSET_METHOD_BROKER
} rd_kafka_offset_method_t;
/**
* Optional configuration struct passed to rd_kafka_new*().
*
* The struct is populated ted through string properties
* by calling rd_kafka_conf_set().
*
*/
struct rd_kafka_conf_s {
/*
* Generic configuration
*/
int enabled_events;
int max_msg_size;
int msg_copy_max_size;
int recv_max_msg_size;
int max_inflight;
int metadata_request_timeout_ms;
int metadata_refresh_interval_ms;
int metadata_refresh_fast_cnt;
int metadata_refresh_fast_interval_ms;
int metadata_refresh_sparse;
int metadata_max_age_ms;
int debug;
int broker_addr_ttl;
int broker_addr_family;
int socket_timeout_ms;
int socket_blocking_max_ms;
int socket_sndbuf_size;
int socket_rcvbuf_size;
int socket_keepalive;
int socket_nagle_disable;
int socket_max_fails;
char *client_id_str;
char *brokerlist;
int stats_interval_ms;
int term_sig;
int reconnect_jitter_ms;
int api_version_request;
int api_version_request_timeout_ms;
int api_version_fallback_ms;
char *broker_version_fallback;
rd_kafka_secproto_t security_protocol;
#if WITH_SSL
struct {
SSL_CTX *ctx;
char *cipher_suites;
char *key_location;
char *key_password;
char *cert_location;
char *ca_location;
char *crl_location;
} ssl;
#endif
struct {
const struct rd_kafka_sasl_provider *provider;
char *principal;
char *mechanisms;
char *service_name;
char *kinit_cmd;
char *keytab;
int relogin_min_time;
char *username;
char *password;
#if WITH_SASL_SCRAM
/* SCRAM EVP-wrapped hash function
* (return value from EVP_shaX()) */
const void/*EVP_MD*/ *scram_evp;
/* SCRAM direct hash function (e.g., SHA256()) */
unsigned char *(*scram_H) (const unsigned char *d, size_t n,
unsigned char *md);
/* Hash size */
size_t scram_H_size;
#endif
} sasl;
#if WITH_PLUGINS
char *plugin_paths;
rd_list_t plugins;
#endif
/* Interceptors */
struct {
/* rd_kafka_interceptor_method_t lists */
rd_list_t on_conf_set; /* on_conf_set interceptors
* (not copied on conf_dup()) */
rd_list_t on_conf_dup; /* .. (not copied) */
rd_list_t on_conf_destroy; /* .. (not copied) */
rd_list_t on_new; /* .. (copied) */
rd_list_t on_destroy; /* .. (copied) */
rd_list_t on_send; /* .. (copied) */
rd_list_t on_acknowledgement; /* .. (copied) */
rd_list_t on_consume; /* .. (copied) */
rd_list_t on_commit; /* .. (copied) */
/* rd_strtup_t list */
rd_list_t config; /* Configuration name=val's
* handled by interceptors. */
} interceptors;
/* Client group configuration */
int coord_query_intvl_ms;
int builtin_features;
/*
* Consumer configuration
*/
int check_crcs;
int queued_min_msgs;
int queued_max_msg_kbytes;
int64_t queued_max_msg_bytes;
int fetch_wait_max_ms;
int fetch_msg_max_bytes;
int fetch_min_bytes;
int fetch_error_backoff_ms;
char *group_id_str;
rd_kafka_pattern_list_t *topic_blacklist;
struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
* for automatically
* subscribed topics. */
int enable_auto_commit;
int enable_auto_offset_store;
int auto_commit_interval_ms;
int group_session_timeout_ms;
int group_heartbeat_intvl_ms;
rd_kafkap_str_t *group_protocol_type;
char *partition_assignment_strategy;
rd_list_t partition_assignors;
int enabled_assignor_cnt;
struct rd_kafka_assignor_s *assignor;
void (*rebalance_cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque);
void (*offset_commit_cb) (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque);
rd_kafka_offset_method_t offset_store_method;
int enable_partition_eof;
/*
* Producer configuration
*/
int queue_buffering_max_msgs;
int queue_buffering_max_kbytes;
int buffering_max_ms;
int max_retries;
int retry_backoff_ms;
int batch_num_messages;
rd_kafka_compression_t compression_codec;
int dr_err_only;
/* Message delivery report callback.
* Called once for each produced message, either on
* successful and acknowledged delivery to the broker in which
* case 'err' is 0, or if the message could not be delivered
* in which case 'err' is non-zero (use rd_kafka_err2str()
* to obtain a human-readable error reason).
*
* If the message was produced with neither RD_KAFKA_MSG_F_FREE
* or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
* pointer provided to rd_kafka_produce().
* rdkafka will not perform any further actions on 'payload'
* at this point and the application may rd_free the payload data
* at this point.
*
* 'opaque' is 'conf.opaque', while 'msg_opaque' is
* the opaque pointer provided in the rd_kafka_produce() call.
*/
void (*dr_cb) (rd_kafka_t *rk,
void *payload, size_t len,
rd_kafka_resp_err_t err,
void *opaque, void *msg_opaque);
void (*dr_msg_cb) (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
void *opaque);
/* Consume callback */
void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
/* Log callback */
void (*log_cb) (const rd_kafka_t *rk, int level,
const char *fac, const char *buf);
int log_level;
int log_queue;
int log_thread_name;
int log_connection_close;
/* Error callback */
void (*error_cb) (rd_kafka_t *rk, int err,
const char *reason, void *opaque);
/* Throttle callback */
void (*throttle_cb) (rd_kafka_t *rk, const char *broker_name,
int32_t broker_id, int throttle_time_ms,
void *opaque);
/* Stats callback */
int (*stats_cb) (rd_kafka_t *rk,
char *json,
size_t json_len,
void *opaque);
/* Socket creation callback */
int (*socket_cb) (int domain, int type, int protocol, void *opaque);
/* Connect callback */
int (*connect_cb) (int sockfd,
const struct sockaddr *addr,
int addrlen,
const char *id,
void *opaque);
/* Close socket callback */
int (*closesocket_cb) (int sockfd, void *opaque);
/* File open callback */
int (*open_cb) (const char *pathname, int flags, mode_t mode,
void *opaque);
/* Opaque passed to callbacks. */
void *opaque;
/* For use with value-less properties. */
int dummy;
};
int rd_kafka_socket_cb_linux (int domain, int type, int protocol, void *opaque);
int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
void *opaque);
#ifndef _MSC_VER
int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
void *opaque);
#endif
int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
void *opaque);
struct rd_kafka_topic_conf_s {
int required_acks;
int32_t request_timeout_ms;
int message_timeout_ms;
int32_t (*partitioner) (const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque);
rd_kafka_compression_t compression_codec;
int produce_offset_report;
int consume_callback_max_msgs;
int auto_commit;
int auto_commit_interval_ms;
int auto_offset_reset;
char *offset_store_path;
int offset_store_sync_interval_ms;
rd_kafka_offset_method_t offset_store_method;
/* Application provided opaque pointer (this is rkt_opaque) */
void *opaque;
};
void rd_kafka_anyconf_destroy (int scope, void *conf);