blob: 792c1a550aa73516709aad1c20bddbb8d10d4894 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2013, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#ifndef _MSC_VER
#define _GNU_SOURCE /* for strndup() */
#include <syslog.h>
#else
typedef int mode_t;
#endif
#include <fcntl.h>
#include "rdsysqueue.h"
#include "rdkafka.h"
#include "rd.h"
#include "rdlog.h"
#include "rdtime.h"
#include "rdaddr.h"
#include "rdinterval.h"
#include "rdavg.h"
#include "rdlist.h"
#if WITH_SSL
#include <openssl/ssl.h>
#endif
typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
typedef struct rd_ikafka_s rd_ikafka_t;
#define rd_kafka_assert(rk, cond) do { \
if (unlikely(!(cond))) \
rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
(rk), "assert: " # cond); \
} while (0)
void
RD_NORETURN
rd_kafka_crash (const char *file, int line, const char *function,
rd_kafka_t *rk, const char *reason);
/* Forward declarations */
struct rd_kafka_s;
struct rd_kafka_itopic_s;
struct rd_kafka_msg_s;
struct rd_kafka_broker_s;
typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
#include "rdkafka_op.h"
#include "rdkafka_queue.h"
#include "rdkafka_msg.h"
#include "rdkafka_proto.h"
#include "rdkafka_buf.h"
#include "rdkafka_pattern.h"
#include "rdkafka_conf.h"
#include "rdkafka_transport.h"
#include "rdkafka_timer.h"
#include "rdkafka_assignor.h"
#include "rdkafka_metadata.h"
/**
* Protocol level sanity
*/
#define RD_KAFKAP_BROKERS_MAX 1000
#define RD_KAFKAP_TOPICS_MAX 1000000
#define RD_KAFKAP_PARTITIONS_MAX 10000
#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
/**
* Kafka handle, internal representation of the application's rd_kafka_t.
*/
typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
struct rd_kafka_s {
rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */
rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */
TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
rd_list_t rk_broker_by_id; /* Fast id lookups. */
rd_atomic32_t rk_broker_cnt;
rd_atomic32_t rk_broker_down_cnt;
mtx_t rk_internal_rkb_lock;
rd_kafka_broker_t *rk_internal_rkb;
/* Broadcasting of broker state changes to wake up
* functions waiting for a state change. */
cnd_t rk_broker_state_change_cnd;
mtx_t rk_broker_state_change_lock;
int rk_broker_state_change_version;
TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics;
int rk_topic_cnt;
struct rd_kafka_cgrp_s *rk_cgrp;
rd_kafka_conf_t rk_conf;
rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
char rk_name[128];
rd_kafkap_str_t *rk_client_id;
rd_kafkap_str_t *rk_group_id; /* Consumer group id */
int rk_flags;
rd_atomic32_t rk_terminate;
rwlock_t rk_lock;
rd_kafka_type_t rk_type;
struct timeval rk_tv_state_change;
rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value
* from broker. */
/* Locks: rd_kafka_*lock() */
rd_ts_t rk_ts_metadata; /* Timestamp of most recent
* metadata. */
struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
char *rk_clusterid; /* ClusterId from metadata */
/* Simple consumer count:
* >0: Running in legacy / Simple Consumer mode,
* 0: No consumers running
* <0: Running in High level consumer mode */
rd_atomic32_t rk_simple_cnt;
/**
* Exactly Once Semantics
*/
struct {
rd_kafkap_str_t *TransactionalId;
int64_t PID;
int16_t ProducerEpoch;
} rk_eos;
const rd_kafkap_bytes_t *rk_null_bytes;
struct {
mtx_t lock; /* Protects acces to this struct */
cnd_t cnd; /* For waking up blocking injectors */
unsigned int cnt; /* Current message count */
size_t size; /* Current message size sum */
unsigned int max_cnt; /* Max limit */
size_t max_size; /* Max limit */
} rk_curr_msgs;
rd_kafka_timers_t rk_timers;
thrd_t rk_thread;
int rk_initialized;
};
#define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock)
#define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock)
#define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock)
#define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock)
/**
* @brief Add \p cnt messages and of total size \p size bytes to the
* internal bookkeeping of current message counts.
* If the total message count or size after add would exceed the
* configured limits \c queue.buffering.max.messages and
* \c queue.buffering.max.kbytes then depending on the value of
* \p block the function either blocks until enough space is available
* if \p block is 1, else immediately returns
* RD_KAFKA_RESP_ERR__QUEUE_FULL.
*/
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
int block) {
if (rk->rk_type != RD_KAFKA_PRODUCER)
return RD_KAFKA_RESP_ERR_NO_ERROR;
mtx_lock(&rk->rk_curr_msgs.lock);
while (unlikely(rk->rk_curr_msgs.cnt + cnt >
rk->rk_curr_msgs.max_cnt ||
(unsigned long long)(rk->rk_curr_msgs.size + size) >
(unsigned long long)rk->rk_curr_msgs.max_size)) {
if (!block) {
mtx_unlock(&rk->rk_curr_msgs.lock);
return RD_KAFKA_RESP_ERR__QUEUE_FULL;
}
cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
}
rk->rk_curr_msgs.cnt += cnt;
rk->rk_curr_msgs.size += size;
mtx_unlock(&rk->rk_curr_msgs.lock);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief Subtract \p cnt messages of total size \p size from the
* current bookkeeping and broadcast a wakeup on the condvar
* for any waiting & blocking threads.
*/
static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
int broadcast = 0;
if (rk->rk_type != RD_KAFKA_PRODUCER)
return;
mtx_lock(&rk->rk_curr_msgs.lock);
rd_kafka_assert(NULL,
rk->rk_curr_msgs.cnt >= cnt &&
rk->rk_curr_msgs.size >= size);
/* If the subtraction would pass one of the thresholds
* broadcast a wake-up to any waiting listeners. */
if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
(rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
broadcast = 1;
rk->rk_curr_msgs.cnt -= cnt;
rk->rk_curr_msgs.size -= size;
if (unlikely(broadcast))
cnd_broadcast(&rk->rk_curr_msgs.cnd);
mtx_unlock(&rk->rk_curr_msgs.lock);
}
static RD_INLINE RD_UNUSED void
rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
if (rk->rk_type != RD_KAFKA_PRODUCER) {
*cntp = 0;
*sizep = 0;
return;
}
mtx_lock(&rk->rk_curr_msgs.lock);
*cntp = rk->rk_curr_msgs.cnt;
*sizep = rk->rk_curr_msgs.size;
mtx_unlock(&rk->rk_curr_msgs.lock);
}
static RD_INLINE RD_UNUSED int
rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
int cnt;
if (rk->rk_type != RD_KAFKA_PRODUCER)
return 0;
mtx_lock(&rk->rk_curr_msgs.lock);
cnt = rk->rk_curr_msgs.cnt;
mtx_unlock(&rk->rk_curr_msgs.lock);
return cnt;
}
void rd_kafka_destroy_final (rd_kafka_t *rk);
/**
* Returns true if 'rk' handle is terminating.
*/
#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate))
#define rd_kafka_is_simple_consumer(rk) \
(rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
/**
* Debug contexts
*/
#define RD_KAFKA_DBG_GENERIC 0x1
#define RD_KAFKA_DBG_BROKER 0x2
#define RD_KAFKA_DBG_TOPIC 0x4
#define RD_KAFKA_DBG_METADATA 0x8
#define RD_KAFKA_DBG_FEATURE 0x10
#define RD_KAFKA_DBG_QUEUE 0x20
#define RD_KAFKA_DBG_MSG 0x40
#define RD_KAFKA_DBG_PROTOCOL 0x80
#define RD_KAFKA_DBG_CGRP 0x100
#define RD_KAFKA_DBG_SECURITY 0x200
#define RD_KAFKA_DBG_FETCH 0x400
#define RD_KAFKA_DBG_INTERCEPTOR 0x800
#define RD_KAFKA_DBG_PLUGIN 0x1000
#define RD_KAFKA_DBG_ALL 0xffff
void rd_kafka_log0(const rd_kafka_conf_t *conf,
const rd_kafka_t *rk, const char *extra, int level,
const char *fac, const char *fmt, ...) RD_FORMAT(printf,
6, 7);
#define rd_kafka_log(rk,level,fac,...) \
rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
#define rd_kafka_dbg(rk,ctx,fac,...) do { \
if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
rd_kafka_log0(&rk->rk_conf,rk,NULL, \
LOG_DEBUG,fac,__VA_ARGS__); \
} while (0)
/* dbg() not requiring an rk, just the conf object, for early logging */
#define rd_kafka_dbg0(conf,ctx,fac,...) do { \
if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \
rd_kafka_log0(conf,NULL,NULL, \
LOG_DEBUG,fac,__VA_ARGS__); \
} while (0)
/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
* when logging another broker's name in the message. */
#define rd_rkb_log(rkb,level,fac,...) do { \
char _logname[RD_KAFKA_NODENAME_SIZE]; \
mtx_lock(&(rkb)->rkb_logname_lock); \
strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \
_logname[RD_KAFKA_NODENAME_SIZE-1] = '\0'; \
mtx_unlock(&(rkb)->rkb_logname_lock); \
rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
(rkb)->rkb_rk, _logname, \
level, fac, __VA_ARGS__); \
} while (0)
#define rd_rkb_dbg(rkb,ctx,fac,...) do { \
if (unlikely((rkb)->rkb_rk->rk_conf.debug & \
(RD_KAFKA_DBG_ ## ctx))) { \
rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \
} \
} while (0)
extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
static RD_UNUSED RD_INLINE
rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
int errnox) {
if (errnox) {
#ifdef _MSC_VER
/* This is the correct way to set errno on Windows,
* but it is still pointless due to different errnos in
* in different runtimes:
* https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
* errno is thus highly deprecated, and buggy, on Windows
* when using librdkafka as a dynamically loaded DLL. */
_set_errno(errnox);
#else
errno = errnox;
#endif
}
rd_kafka_last_error_code = err;
return err;
}
extern rd_atomic32_t rd_kafka_thread_cnt_curr;
extern char RD_TLS rd_kafka_thread_name[64];
int rd_kafka_path_is_dir (const char *path);
rd_kafka_op_res_t
rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque);
rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);