blob: fdd16116a5d011332ac5ed38f25d764863ac6f29 [file] [log] [blame]
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2015 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rdkafka_int.h"
#include "rdkafka_topic.h"
#include "rdkafka_broker.h"
#include "rdkafka_request.h"
#include "rdkafka_offset.h"
#include "rdkafka_partition.h"
#include "rdregex.h"
#include "rdports.h" /* rd_qsort_r() */
const char *rd_kafka_fetch_states[] = {
"none",
"stopping",
"stopped",
"offset-query",
"offset-wait",
"active"
};
static rd_kafka_op_res_t
rd_kafka_toppar_op_serve (rd_kafka_t *rk,
rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque);
static RD_INLINE void rd_kafka_broker_fetch_toppar_del (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp);
static RD_INLINE int32_t
rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
const char *func, int line) {
int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
"%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
func, line, version);
return version;
}
#define rd_kafka_toppar_version_new_barrier(rktp) \
rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)
/**
* Toppar based OffsetResponse handling.
* This is used for updating the low water mark for consumer lag.
*/
static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
shptr_rd_kafka_toppar_t *s_rktp = opaque;
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_topic_partition_t *rktpar;
offsets = rd_kafka_topic_partition_list_new(1);
/* Parse and return Offset */
err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
rkbuf, request, offsets);
if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
offsets,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition)))
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
if (!err) {
rd_kafka_toppar_lock(rktp);
rktp->rktp_lo_offset = rktpar->offset;
rd_kafka_toppar_unlock(rktp);
}
rd_kafka_topic_partition_list_destroy(offsets);
rktp->rktp_wait_consumer_lag_resp = 0;
rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
}
/**
* Request information from broker to keep track of consumer lag.
*
* Locality: toppar handle thread
*/
static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
rd_kafka_broker_t *rkb;
rd_kafka_topic_partition_list_t *partitions;
if (rktp->rktp_wait_consumer_lag_resp)
return; /* Previous request not finished yet */
rkb = rd_kafka_toppar_leader(rktp, 1/*proper brokers only*/);
if (!rkb)
return;
rktp->rktp_wait_consumer_lag_resp = 1;
partitions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(partitions,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition)->offset =
RD_KAFKA_OFFSET_BEGINNING;
/* Ask for oldest offset. The newest offset is automatically
* propagated in FetchResponse.HighwaterMark. */
rd_kafka_OffsetRequest(rkb, partitions, 0,
RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
rd_kafka_toppar_lag_handle_Offset,
rd_kafka_toppar_keep(rktp));
rd_kafka_topic_partition_list_destroy(partitions);
rd_kafka_broker_destroy(rkb); /* from toppar_leader() */
}
/**
* Request earliest offset to measure consumer lag
*
* Locality: toppar handler thread
*/
static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
void *arg) {
rd_kafka_toppar_t *rktp = arg;
rd_kafka_toppar_consumer_lag_req(rktp);
}
/**
* Add new partition to topic.
*
* Locks: rd_kafka_topic_wrlock() must be held.
* Locks: rd_kafka_wrlock() must be held.
*/
shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
int32_t partition,
const char *func, int line) {
rd_kafka_toppar_t *rktp;
rktp = rd_calloc(1, sizeof(*rktp));
rktp->rktp_partition = partition;
rktp->rktp_rkt = rkt;
rktp->rktp_leader_id = -1;
rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
rktp->rktp_fetch_msg_max_bytes
= rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
rktp->rktp_offset_fp = NULL;
rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
rd_kafka_msgq_init(&rktp->rktp_msgq);
rktp->rktp_msgq_wakeup_fd = -1;
rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
mtx_init(&rktp->rktp_lock, mtx_plain);
rd_refcnt_init(&rktp->rktp_refcnt, 0);
rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_ops = rd_kafka_q_new(rkt->rkt_rk);
rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
rktp->rktp_ops->rkq_opaque = rktp;
rd_atomic32_init(&rktp->rktp_version, 1);
rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);
/* Consumer: If statistics is available we query the oldest offset
* of each partition.
* Since the oldest offset only moves on log retention, we cap this
* value on the low end to a reasonable value to avoid flooding
* the brokers with OffsetRequests when our statistics interval is low.
* FIXME: Use a global timer to collect offsets for all partitions */
if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
if (intvl < 10 * 1000 /* 10s */)
intvl = 10 * 1000;
rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
&rktp->rktp_consumer_lag_tmr,
intvl * 1000ll,
rd_kafka_toppar_consumer_lag_tmr_cb,
rktp);
}
rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt);
rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW", "NEW %s [%"PRId32"] %p (at %s:%d)",
rkt->rkt_topic->str, rktp->rktp_partition, rktp,
func, line);
return rd_kafka_toppar_keep_src(func, line, rktp);
}
/**
* Removes a toppar from its duties, global lists, etc.
*
* Locks: rd_kafka_toppar_lock() MUST be held
*/
static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
"Removing toppar %s [%"PRId32"] %p",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rktp);
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr, 1/*lock*/);
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_consumer_lag_tmr, 1/*lock*/);
rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
}
/**
* Final destructor for partition.
*/
void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {
rd_kafka_toppar_remove(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
"%s [%"PRId32"]: %p DESTROY_FINAL",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rktp);
/* Clear queues */
rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
RD_KAFKA_RESP_ERR__DESTROY);
rd_kafka_q_destroy(rktp->rktp_fetchq);
rd_kafka_q_destroy(rktp->rktp_ops);
rd_kafka_replyq_destroy(&rktp->rktp_replyq);
rd_kafka_topic_destroy0(rktp->rktp_s_rkt);
mtx_destroy(&rktp->rktp_lock);
rd_refcnt_destroy(&rktp->rktp_refcnt);
rd_free(rktp);
}
/**
* Set toppar fetching state.
*
* Locality: broker thread
* Locks: rd_kafka_toppar_lock() MUST be held.
*/
void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
int fetch_state) {
rd_kafka_assert(NULL,
thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
if ((int)rktp->rktp_fetch_state == fetch_state)
return;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
"Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->rktp_fetch_state],
rd_kafka_fetch_states[fetch_state]);
rktp->rktp_fetch_state = fetch_state;
}
/**
* Returns the appropriate toppar for a given rkt and partition.
* The returned toppar has increased refcnt and must be unreffed by calling
* rd_kafka_toppar_destroy().
* May return NULL.
*
* If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
* 'partition' was not known locally, else NULL is returned.
*
* Locks: Caller must hold rd_kafka_topic_*lock()
*/
shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
const rd_kafka_itopic_t *rkt,
int32_t partition,
int ua_on_miss) {
shptr_rd_kafka_toppar_t *s_rktp;
if (partition >= 0 && partition < rkt->rkt_partition_cnt)
s_rktp = rkt->rkt_p[partition];
else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
s_rktp = rkt->rkt_ua;
else
return NULL;
if (s_rktp)
return rd_kafka_toppar_keep_src(func,line,
rd_kafka_toppar_s2i(s_rktp));
return NULL;
}
/**
* Same as rd_kafka_toppar_get() but no need for locking and
* looks up the topic first.
*
* Locality: any
* Locks: none
*/
shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
const char *topic,
int32_t partition,
int ua_on_miss,
int create_on_miss) {
shptr_rd_kafka_itopic_t *s_rkt;
rd_kafka_itopic_t *rkt;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_wrlock(rk);
/* Find or create topic */
if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
if (!create_on_miss) {
rd_kafka_wrunlock(rk);
return NULL;
}
s_rkt = rd_kafka_topic_new0(rk, topic, NULL,
NULL, 0/*no-lock*/);
if (!s_rkt) {
rd_kafka_wrunlock(rk);
rd_kafka_log(rk, LOG_ERR, "TOPIC",
"Failed to create local topic \"%s\": %s",
topic, rd_strerror(errno));
return NULL;
}
}
rd_kafka_wrunlock(rk);
rkt = rd_kafka_topic_s2i(s_rkt);
rd_kafka_topic_wrlock(rkt);
s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
rd_kafka_topic_wrunlock(rkt);
rd_kafka_topic_destroy0(s_rkt);
return s_rktp;
}
/**
* Returns a toppar if it is available in the cluster.
* '*errp' is set to the error-code if lookup fails.
*
* Locks: topic_*lock() MUST be held
*/
shptr_rd_kafka_toppar_t *
rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt,
int32_t partition, int ua_on_miss,
rd_kafka_resp_err_t *errp) {
shptr_rd_kafka_toppar_t *s_rktp;
switch (rkt->rkt_state)
{
case RD_KAFKA_TOPIC_S_UNKNOWN:
/* No metadata received from cluster yet.
* Put message in UA partition and re-run partitioner when
* cluster comes up. */
partition = RD_KAFKA_PARTITION_UA;
break;
case RD_KAFKA_TOPIC_S_NOTEXISTS:
/* Topic not found in cluster.
* Fail message immediately. */
*errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
return NULL;
case RD_KAFKA_TOPIC_S_EXISTS:
/* Topic exists in cluster. */
/* Topic exists but has no partitions.
* This is usually an transient state following the
* auto-creation of a topic. */
if (unlikely(rkt->rkt_partition_cnt == 0)) {
partition = RD_KAFKA_PARTITION_UA;
break;
}
/* Check that partition exists. */
if (partition >= rkt->rkt_partition_cnt) {
*errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
return NULL;
}
break;
default:
rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
break;
}
/* Get new partition */
s_rktp = rd_kafka_toppar_get(rkt, partition, 0);
if (unlikely(!s_rktp)) {
/* Unknown topic or partition */
if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
*errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
else
*errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
return NULL;
}
return s_rktp;
}
/**
* Looks for partition 'i' in topic 'rkt's desired list.
*
* The desired partition list is the list of partitions that are desired
* (e.g., by the consumer) but not yet seen on a broker.
* As soon as the partition is seen on a broker the toppar is moved from
* the desired list and onto the normal rkt_p array.
* When the partition on the broker goes away a desired partition is put
* back on the desired list.
*
* Locks: rd_kafka_topic_*lock() must be held.
* Note: 'rktp' refcount is increased.
*/
shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt,
int32_t partition) {
shptr_rd_kafka_toppar_t *s_rktp;
int i;
RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
if (rktp->rktp_partition == partition)
return rd_kafka_toppar_keep(rktp);
}
return NULL;
}
/**
* Link toppar on desired list.
*
* Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
*/
void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
shptr_rd_kafka_toppar_t *s_rktp;
if (rktp->rktp_s_for_desp)
return; /* Already linked */
s_rktp = rd_kafka_toppar_keep(rktp);
rd_list_add(&rktp->rktp_rkt->rkt_desp, s_rktp);
rktp->rktp_s_for_desp = s_rktp; /* Desired list refcount */
}
/**
* Unlink toppar from desired list.
*
* Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
*/
void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
if (!rktp->rktp_s_for_desp)
return; /* Not linked */
rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp->rktp_s_for_desp);
rd_kafka_toppar_destroy(rktp->rktp_s_for_desp);
rktp->rktp_s_for_desp = NULL;
}
/**
* @brief If rktp is not already desired:
* - mark as DESIRED|UNKNOWN
* - add to desired list
*
* @remark toppar_lock() MUST be held
*/
void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
return;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
"%s [%"PRId32"]: adding to DESIRED list",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
rd_kafka_toppar_desired_link(rktp);
}
/**
* Adds 'partition' as a desired partition to topic 'rkt', or updates
* an existing partition to be desired.
*
* Locks: rd_kafka_topic_wrlock() must be held.
*/
shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt,
int32_t partition) {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
if ((s_rktp = rd_kafka_toppar_get(rkt,
partition, 0/*no_ua_on_miss*/))) {
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))) {
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP",
"Setting topic %s [%"PRId32"] partition "
"as desired",
rkt->rkt_topic->str, rktp->rktp_partition);
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
}
rd_kafka_toppar_unlock(rktp);
return s_rktp;
}
if ((s_rktp = rd_kafka_toppar_desired_get(rkt, partition)))
return s_rktp;
s_rktp = rd_kafka_toppar_new(rkt, partition);
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
rd_kafka_toppar_desired_add0(rktp);
rd_kafka_toppar_unlock(rktp);
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP",
"Adding desired topic %s [%"PRId32"]",
rkt->rkt_topic->str, rktp->rktp_partition);
return s_rktp; /* Callers refcount */
}
/**
* Unmarks an 'rktp' as desired.
*
* Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
*/
void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
return;
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
rd_kafka_toppar_desired_unlink(rktp);
if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN)
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
"Removing (un)desired topic %s [%"PRId32"]",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
}
/**
* Append message at tail of 'rktp' message queue.
*/
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
#ifndef _MSC_VER
if (rktp->rktp_msgq_wakeup_fd != -1 &&
rd_kafka_msgq_len(&rktp->rktp_msgq) == 1) {
char one = 1;
int r;
r = rd_write(rktp->rktp_msgq_wakeup_fd, &one, sizeof(one));
if (r == -1)
rd_kafka_log(rktp->rktp_rkt->rkt_rk, LOG_ERR, "PARTENQ",
"%s [%"PRId32"]: write to "
"wake-up fd %d failed: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rktp->rktp_msgq_wakeup_fd,
rd_strerror(errno));
}
#endif
rd_kafka_toppar_unlock(rktp);
}
/**
* Dequeue message from 'rktp' message queue.
*/
void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_deq(&rktp->rktp_msgq, rkm, 1);
rd_kafka_toppar_unlock(rktp);
}
/**
* Inserts all messages from 'rkmq' at head of toppar 'rktp's queue.
* 'rkmq' will be cleared.
*/
void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_concat(rkmq, &rktp->rktp_msgq);
rd_kafka_msgq_move(&rktp->rktp_msgq, rkmq);
rd_kafka_toppar_unlock(rktp);
}
/**
* Concats all messages from 'rkmq' at tail of toppar 'rktp's queue.
* 'rkmq' will be cleared.
*/
void rd_kafka_toppar_concat_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_concat(&rktp->rktp_msgq, rkmq);
rd_kafka_toppar_unlock(rktp);
}
/**
* Move all messages in 'rkmq' to the unassigned partition, if any.
* Returns 0 on success or -1 if there was no UA partition.
*/
int rd_kafka_toppar_ua_move (rd_kafka_itopic_t *rkt, rd_kafka_msgq_t *rkmq) {
shptr_rd_kafka_toppar_t *s_rktp_ua;
rd_kafka_topic_rdlock(rkt);
s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
rd_kafka_topic_rdunlock(rkt);
if (unlikely(s_rktp_ua == NULL))
return -1;
rd_kafka_msgq_concat(&rd_kafka_toppar_s2i(s_rktp_ua)->rktp_msgq, rkmq);
rd_kafka_toppar_destroy(s_rktp_ua);
return 0;
}
/**
* Helper method for purging queues when removing a toppar.
* Locks: rd_kafka_toppar_lock() MUST be held
*/
void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp) {
rd_kafka_q_disable(rktp->rktp_fetchq);
rd_kafka_q_purge(rktp->rktp_fetchq);
rd_kafka_q_disable(rktp->rktp_ops);
rd_kafka_q_purge(rktp->rktp_ops);
}
/**
* Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb.
* This is an async operation.
*
* Locks: rd_kafka_toppar_lock() MUST be held
*/
static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *old_rkb,
rd_kafka_broker_t *new_rkb) {
rd_kafka_op_t *rko;
rd_kafka_broker_t *dest_rkb;
int had_next_leader = rktp->rktp_next_leader ? 1 : 0;
/* Update next leader */
if (new_rkb)
rd_kafka_broker_keep(new_rkb);
if (rktp->rktp_next_leader)
rd_kafka_broker_destroy(rktp->rktp_next_leader);
rktp->rktp_next_leader = new_rkb;
/* If next_leader is set it means there is already an async
* migration op going on and we should not send a new one
* but simply change the next_leader (which we did above). */
if (had_next_leader)
return;
/* Revert from offset-wait state back to offset-query
* prior to leaving the broker to avoid stalling
* on the new broker waiting for a offset reply from
* this old broker (that might not come and thus need
* to time out..slowly) */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) {
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
500*1000,
rd_kafka_offset_query_tmr_cb,
rktp);
}
if (old_rkb) {
/* If there is an existing broker for this toppar we let it
* first handle its own leave and then trigger the join for
* the next leader, if any. */
rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
dest_rkb = old_rkb;
} else {
/* No existing broker, send join op directly to new leader. */
rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
dest_rkb = new_rkb;
}
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
"Migrating topic %.*s [%"PRId32"] %p from %s to %s "
"(sending %s to %s)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rktp,
old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
rd_kafka_op2str(rko->rko_type),
rd_kafka_broker_name(dest_rkb));
rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
}
/**
* Async toppar leave from broker.
* Only use this when partitions are to be removed.
*
* Locks: rd_kafka_toppar_lock() MUST be held
*/
void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
rd_kafka_op_t *rko;
rd_kafka_broker_t *dest_rkb;
if (rktp->rktp_next_leader)
dest_rkb = rktp->rktp_next_leader;
else if (rktp->rktp_leader)
dest_rkb = rktp->rktp_leader;
else {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
"%.*s [%"PRId32"] %p not handled by any broker: "
"not sending LEAVE for remove",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rktp);
return;
}
/* Revert from offset-wait state back to offset-query
* prior to leaving the broker to avoid stalling
* on the new broker waiting for a offset reply from
* this old broker (that might not come and thus need
* to time out..slowly) */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
"%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rktp,
rd_kafka_broker_name(dest_rkb));
rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
}
/**
* Delegates broker 'rkb' as leader for toppar 'rktp'.
* 'rkb' may be NULL to undelegate leader.
*
* Locks: Caller must have rd_kafka_topic_wrlock(rktp->rktp_rkt)
* AND rd_kafka_toppar_lock(rktp) held.
*/
void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb,
int for_removal) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
int internal_fallback = 0;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"%s [%"PRId32"]: delegate to broker %s "
"(rktp %p, term %d, ref %d, remove %d)",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rkb ? rkb->rkb_name : "(none)",
rktp, rd_kafka_terminating(rk),
rd_refcnt_get(&rktp->rktp_refcnt),
for_removal);
/* Delegate toppars with no leader to the
* internal broker for bookkeeping. */
if (!rkb && !for_removal && !rd_kafka_terminating(rk)) {
rkb = rd_kafka_broker_internal(rk);
internal_fallback = 1;
}
if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"%.*s [%"PRId32"]: not updating broker: "
"already on correct broker %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rkb ? rd_kafka_broker_name(rkb) : "(none)");
if (internal_fallback)
rd_kafka_broker_destroy(rkb);
return;
}
if (rktp->rktp_leader)
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"%.*s [%"PRId32"]: broker %s no longer leader",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_broker_name(rktp->rktp_leader));
if (rkb) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"%.*s [%"PRId32"]: broker %s is now leader "
"for partition with %i messages "
"(%"PRIu64" bytes) queued",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_broker_name(rkb),
rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt),
rd_atomic64_get(&rktp->rktp_msgq.rkmq_msg_bytes));
} else {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
"%.*s [%"PRId32"]: no leader broker",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);
}
if (rktp->rktp_leader || rkb)
rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb);
if (internal_fallback)
rd_kafka_broker_destroy(rkb);
}
void
rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets){
if (err) {
rd_kafka_q_op_err(rktp->rktp_fetchq,
RD_KAFKA_OP_CONSUMER_ERR,
err, 0 /* FIXME:VERSION*/,
rktp, 0,
"Offset commit failed: %s",
rd_kafka_err2str(err));
return;
}
rd_kafka_toppar_lock(rktp);
rktp->rktp_committed_offset = offsets->elems[0].offset;
/* When stopping toppars:
* Final commit is now done (or failed), propagate. */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
rd_kafka_toppar_fetch_stopped(rktp, err);
rd_kafka_toppar_unlock(rktp);
}
/**
* Commit toppar's offset on broker.
* This is an asynch operation, this function simply enqueues an op
* on the cgrp's queue.
*
* Locality: rktp's broker thread
*/
void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
const char *metadata) {
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_topic_partition_t *rktpar;
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "OFFSETCMT",
"%.*s [%"PRId32"]: committing offset %"PRId64,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, offset);
offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rktpar->offset = offset;
if (metadata) {
rktpar->metadata = rd_strdup(metadata);
rktpar->metadata_size = strlen(metadata);
}
rktp->rktp_committing_offset = offset;
rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/);
rd_kafka_topic_partition_list_destroy(offsets);
}
/**
* Handle the next offset to consume for a toppar.
* This is used during initial setup when trying to figure out what
* offset to start consuming from.
*
* Locality: toppar handler thread.
* Locks: toppar_lock(rktp) must be held
*/
void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
int64_t Offset) {
if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
/* Offset storage returned logical offset (e.g. "end"),
* look it up. */
rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
"update");
return;
}
/* Adjust by TAIL count if, if wanted */
if (rktp->rktp_query_offset <=
RD_KAFKA_OFFSET_TAIL_BASE) {
int64_t orig_Offset = Offset;
int64_t tail_cnt =
llabs(rktp->rktp_query_offset -
RD_KAFKA_OFFSET_TAIL_BASE);
if (tail_cnt > Offset)
Offset = 0;
else
Offset -= tail_cnt;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"OffsetReply for topic %s [%"PRId32"]: "
"offset %"PRId64": adjusting for "
"OFFSET_TAIL(%"PRId64"): "
"effective offset %"PRId64,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
orig_Offset, tail_cnt,
Offset);
}
rktp->rktp_next_offset = Offset;
rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);
/* Wake-up broker thread which might be idling on IO */
if (rktp->rktp_leader)
rd_kafka_broker_wakeup(rktp->rktp_leader);
}
/**
* Fetch stored offset for a single partition. (simple consumer)
*
* Locality: toppar thread
*/
void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
rd_kafka_replyq_t replyq) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
rd_kafka_topic_partition_list_t *part;
rd_kafka_op_t *rko;
rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
"Partition %.*s [%"PRId32"]: querying cgrp for "
"stored offset (opv %d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, replyq.version);
part = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add0(part,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_toppar_keep(rktp));
rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_replyq = replyq;
rko->rko_u.offset_fetch.partitions = part;
rko->rko_u.offset_fetch.do_free = 1;
rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
}
/**
* Toppar based OffsetResponse handling.
* This is used for finding the next offset to Fetch.
*
* Locality: toppar handler thread
*/
static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
shptr_rd_kafka_toppar_t *s_rktp = opaque;
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_topic_partition_t *rktpar;
int64_t Offset;
rd_kafka_toppar_lock(rktp);
/* Drop reply from previous partition leader */
if (rktp->rktp_leader != rkb)
err = RD_KAFKA_RESP_ERR__OUTDATED;
rd_kafka_toppar_unlock(rktp);
offsets = rd_kafka_topic_partition_list_new(1);
/* Parse and return Offset */
err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
rkbuf, request, offsets);
rd_rkb_dbg(rkb, TOPIC, "OFFSET",
"Offset reply for "
"topic %.*s [%"PRId32"] (v%d vs v%d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, request->rkbuf_replyq.version,
rktp->rktp_op_version);
rd_dassert(request->rkbuf_replyq.version > 0);
if (err != RD_KAFKA_RESP_ERR__DESTROY &&
rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
/* Outdated request response, ignore. */
err = RD_KAFKA_RESP_ERR__OUTDATED;
}
if (!err &&
(!(rktpar = rd_kafka_topic_partition_list_find(
offsets,
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition))))
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
if (err) {
rd_kafka_op_t *rko;
rd_rkb_dbg(rkb, TOPIC, "OFFSET",
"Offset reply error for "
"topic %.*s [%"PRId32"] (v%d): %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, request->rkbuf_replyq.version,
rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(offsets);
if (err == RD_KAFKA_RESP_ERR__DESTROY ||
err == RD_KAFKA_RESP_ERR__OUTDATED) {
/* Termination or outdated, quick cleanup. */
/* from request.opaque */
rd_kafka_toppar_destroy(s_rktp);
return;
} else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
return; /* Retry in progress */
rd_kafka_toppar_lock(rktp);
rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
err,
"failed to query logical offset");
/* Signal error back to application,
* unless this is an intermittent problem
* (e.g.,connection lost) */
rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
rko->rko_err = err;
if (rktp->rktp_query_offset <=
RD_KAFKA_OFFSET_TAIL_BASE)
rko->rko_u.err.offset =
rktp->rktp_query_offset -
RD_KAFKA_OFFSET_TAIL_BASE;
else
rko->rko_u.err.offset = rktp->rktp_query_offset;
rd_kafka_toppar_unlock(rktp);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_q_enq(rktp->rktp_fetchq, rko);
rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
return;
}
Offset = rktpar->offset;
rd_kafka_topic_partition_list_destroy(offsets);
rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"Offset %s request for %.*s [%"PRId32"] "
"returned offset %s (%"PRId64")",
rd_kafka_offset2str(rktp->rktp_query_offset),
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);
rd_kafka_toppar_next_offset_handle(rktp, Offset);
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
}
/**
* Send OffsetRequest for toppar.
*
* If \p backoff_ms is non-zero only the query timer is started,
* otherwise a query is triggered directly.
*
* Locality: toppar handler thread
* Locks: toppar_lock() must be held
*/
void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
int64_t query_offset, int backoff_ms) {
rd_kafka_broker_t *rkb;
rd_kafka_assert(NULL,
thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));
rkb = rktp->rktp_leader;
if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
backoff_ms = 500;
if (backoff_ms) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
"%s [%"PRId32"]: %s"
"starting offset query timer for offset %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
!rkb ? "no current leader for partition, " : "",
rd_kafka_offset2str(query_offset));
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
backoff_ms*1000ll,
rd_kafka_offset_query_tmr_cb, rktp);
return;
}
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr, 1/*lock*/);
if (query_offset == RD_KAFKA_OFFSET_STORED &&
rktp->rktp_rkt->rkt_conf.offset_store_method ==
RD_KAFKA_OFFSET_METHOD_BROKER) {
/*
* Get stored offset from broker based storage:
* ask cgrp manager for offsets
*/
rd_kafka_toppar_offset_fetch(
rktp,
RD_KAFKA_REPLYQ(rktp->rktp_ops,
rktp->rktp_op_version));
} else {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_topic_partition_list_t *offsets;
/*
* Look up logical offset (end,beginning,tail,..)
*/
rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
"Partition %.*s [%"PRId32"]: querying for logical "
"offset %s (opv %d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_offset2str(query_offset),
rktp->rktp_op_version);
s_rktp = rd_kafka_toppar_keep(rktp);
if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
query_offset = RD_KAFKA_OFFSET_END;
offsets = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(
offsets,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition)->offset = query_offset;
rd_kafka_OffsetRequest(rkb, offsets, 0,
RD_KAFKA_REPLYQ(rktp->rktp_ops,
rktp->rktp_op_version),
rd_kafka_toppar_handle_Offset,
s_rktp);
rd_kafka_topic_partition_list_destroy(offsets);
}
rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
}
/**
* Start fetching toppar.
*
* Locality: toppar handler thread
* Locks: none
*/
static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
int64_t offset,
rd_kafka_op_t *rko_orig) {
rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
rd_kafka_resp_err_t err = 0;
int32_t version = rko_orig->rko_version;
rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
"Start fetch for %.*s [%"PRId32"] in "
"state %s at offset %s (v%"PRId32")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->rktp_fetch_state],
rd_kafka_offset2str(offset), version);
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
rd_kafka_toppar_unlock(rktp);
goto err_reply;
}
rktp->rktp_op_version = version;
if (rkcg) {
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
/* Attach toppar to cgrp */
rktp->rktp_cgrp = rkcg;
rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
RD_KAFKA_OP_PARTITION_JOIN, 0);
}
if (offset == RD_KAFKA_OFFSET_BEGINNING ||
offset == RD_KAFKA_OFFSET_END ||
offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
rd_kafka_toppar_next_offset_handle(rktp, offset);
} else if (offset == RD_KAFKA_OFFSET_STORED) {
rd_kafka_offset_store_init(rktp);
} else if (offset == RD_KAFKA_OFFSET_INVALID) {
rd_kafka_offset_reset(rktp, offset,
RD_KAFKA_RESP_ERR__NO_OFFSET,
"no previously committed offset "
"available");
} else {
rktp->rktp_next_offset = offset;
rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
/* Wake-up broker thread which might be idling on IO */
if (rktp->rktp_leader)
rd_kafka_broker_wakeup(rktp->rktp_leader);
}
rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;
rd_kafka_toppar_unlock(rktp);
/* Signal back to caller thread that start has commenced, or err */
err_reply:
if (rko_orig->rko_replyq.q) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);
rko->rko_err = err;
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
}
}
/**
* Mark toppar's fetch state as stopped (all decommissioning is done,
* offsets are stored, etc).
*
* Locality: toppar handler thread
* Locks: toppar_lock(rktp) MUST be held
*/
void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err) {
rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);
if (rktp->rktp_cgrp) {
/* Detach toppar from cgrp */
rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
RD_KAFKA_OP_PARTITION_LEAVE, 0);
rktp->rktp_cgrp = NULL;
}
/* Signal back to application thread that stop is done. */
if (rktp->rktp_replyq.q) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
rko->rko_err = err;
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
}
}
/**
* Stop toppar fetcher.
* This is usually an async operation.
*
* Locality: toppar handler thread
*/
void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
rd_kafka_op_t *rko_orig) {
int32_t version = rko_orig->rko_version;
rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
"Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
rktp->rktp_op_version = version;
/* Abort pending offset lookups. */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
1/*lock*/);
/* Clear out the forwarding queue. */
rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);
/* Assign the future replyq to propagate stop results. */
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
if (rko_orig) {
rktp->rktp_replyq = rko_orig->rko_replyq;
rd_kafka_replyq_clear(&rko_orig->rko_replyq);
}
rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);
/* Stop offset store (possibly async).
* NOTE: will call .._stopped() if store finishes immediately,
* so no more operations after this call! */
rd_kafka_offset_store_stop(rktp);
rd_kafka_toppar_unlock(rktp);
}
/**
* Update a toppars offset.
* The toppar must have been previously FETCH_START:ed
*
* Locality: toppar handler thread
*/
void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
int64_t offset, rd_kafka_op_t *rko_orig) {
rd_kafka_resp_err_t err = 0;
int32_t version = rko_orig->rko_version;
rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
"Seek %.*s [%"PRId32"] to offset %s "
"in state %s (v%"PRId32")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_offset2str(offset),
rd_kafka_fetch_states[rktp->rktp_fetch_state], version);
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
goto err_reply;
} else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
err = RD_KAFKA_RESP_ERR__STATE;
goto err_reply;
} else if (offset == RD_KAFKA_OFFSET_STORED) {
err = RD_KAFKA_RESP_ERR__INVALID_ARG;
goto err_reply;
}
rktp->rktp_op_version = version;
/* Abort pending offset lookups. */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
1/*lock*/);
if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
rd_kafka_toppar_next_offset_handle(rktp, offset);
else {
rktp->rktp_next_offset = offset;
rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
/* Wake-up broker thread which might be idling on IO */
if (rktp->rktp_leader)
rd_kafka_broker_wakeup(rktp->rktp_leader);
}
/* Signal back to caller thread that seek has commenced, or err */
err_reply:
rd_kafka_toppar_unlock(rktp);
if (rko_orig && rko_orig->rko_replyq.q) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);
rko->rko_err = err;
rko->rko_u.fetch_start.offset =
rko_orig->rko_u.fetch_start.offset;
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
}
}
static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
rd_kafka_op_t *rko_orig) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
int pause = rko_orig->rko_u.pause.pause;
int flag = rko_orig->rko_u.pause.flag;
int32_t version = rko_orig->rko_version;
rd_kafka_toppar_lock(rktp);
rktp->rktp_op_version = version;
if (pause) {
/* Pause partition */
rktp->rktp_flags |= flag;
if (rk->rk_type == RD_KAFKA_CONSUMER) {
/* Save offset of last consumed message+1 as the
* next message to fetch on resume. */
rktp->rktp_next_offset = rktp->rktp_app_offset;
rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
"%s %s [%"PRId32"]: at offset %s "
"(state %s, v%d)",
pause ? "Pause":"Resume",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_offset2str(
rktp->rktp_next_offset),
rd_kafka_fetch_states[rktp->
rktp_fetch_state],
version);
} else {
rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
"%s %s [%"PRId32"] (state %s, v%d)",
pause ? "Pause":"Resume",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->
rktp_fetch_state],
version);
}
} else {
/* Resume partition */
rktp->rktp_flags &= ~flag;
if (rk->rk_type == RD_KAFKA_CONSUMER) {
rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
"%s %s [%"PRId32"]: at offset %s "
"(state %s, v%d)",
rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
"Resuming" : "Not resuming stopped",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_offset2str(
rktp->rktp_next_offset),
rd_kafka_fetch_states[rktp->
rktp_fetch_state],
version);
/* If the resuming offset is logical we
* need to trigger a seek (that performs the
* logical->absolute lookup logic) to get
* things going.
* Typical case is when a partition is paused
* before anything has been consumed by app
* yet thus having rktp_app_offset=INVALID. */
if ((rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
rd_kafka_toppar_next_offset_handle(
rktp, rktp->rktp_next_offset);
} else
rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
"%s %s [%"PRId32"] (state %s, v%d)",
pause ? "Pause":"Resume",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->
rktp_fetch_state],
version);
}
rd_kafka_toppar_unlock(rktp);
if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
/* Flush partition's fetch queue */
rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
rko_orig->rko_version);
}
}
/**
* Add toppar to fetch list.
*
* Locality: broker thread
* Locks: none
*/
static RD_INLINE void rd_kafka_broker_fetch_toppar_add (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp){
if (rktp->rktp_fetch)
return; /* Already added */
CIRCLEQ_INSERT_TAIL(&rkb->rkb_fetch_toppars, rktp, rktp_fetchlink);
rkb->rkb_fetch_toppar_cnt++;
rktp->rktp_fetch = 1;
if (unlikely(rkb->rkb_fetch_toppar_cnt == 1))
rd_kafka_broker_fetch_toppar_next(rkb, rktp);
rd_rkb_dbg(rkb, TOPIC, "FETCHADD",
"Added %.*s [%"PRId32"] to fetch list (%d entries, opv %d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rkb->rkb_fetch_toppar_cnt, rktp->rktp_fetch_version);
}
/**
* Remove toppar from fetch list.
*
* Locality: broker thread
* Locks: none
*/
static RD_INLINE void rd_kafka_broker_fetch_toppar_del (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp){
if (!rktp->rktp_fetch)
return; /* Not added */
CIRCLEQ_REMOVE(&rkb->rkb_fetch_toppars, rktp, rktp_fetchlink);
rd_kafka_assert(NULL, rkb->rkb_fetch_toppar_cnt > 0);
rkb->rkb_fetch_toppar_cnt--;
rktp->rktp_fetch = 0;
if (rkb->rkb_fetch_toppar_next == rktp) {
/* Update next pointer */
rd_kafka_broker_fetch_toppar_next(
rkb, CIRCLEQ_LOOP_NEXT(&rkb->rkb_fetch_toppars,
rktp, rktp_fetchlink));
}
rd_rkb_dbg(rkb, TOPIC, "FETCHADD",
"Removed %.*s [%"PRId32"] from fetch list "
"(%d entries, opv %d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rkb->rkb_fetch_toppar_cnt, rktp->rktp_fetch_version);
}
/**
* @brief Decide whether this toppar should be on the fetch list or not.
*
* Also:
* - update toppar's op version (for broker thread's copy)
* - finalize statistics (move rktp_offsets to rktp_offsets_fin)
*
* @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
*
* @locality broker thread
*/
rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb,
int force_remove) {
int should_fetch = 1;
const char *reason = "";
int32_t version;
rd_ts_t ts_backoff = 0;
rd_kafka_toppar_lock(rktp);
/* Forced removal from fetch list */
if (unlikely(force_remove)) {
reason = "forced removal";
should_fetch = 0;
goto done;
}
if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
reason = "partition removed";
should_fetch = 0;
goto done;
}
/* Skip toppars not in active fetch state */
if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
reason = "not in active fetch state";
should_fetch = 0;
goto done;
}
/* Update broker thread's fetch op version */
version = rktp->rktp_op_version;
if (version > rktp->rktp_fetch_version ||
rktp->rktp_next_offset != rktp->rktp_last_next_offset) {
/* New version barrier, something was modified from the
* control plane. Reset and start over.
* Alternatively only the next_offset changed but not the
* barrier, which is the case when automatically triggering
* offset.reset (such as on PARTITION_EOF). */
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
"Topic %s [%"PRId32"]: fetch decide: "
"updating to version %d (was %d) at "
"offset %"PRId64" (was %"PRId64")",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
version, rktp->rktp_fetch_version,
rktp->rktp_next_offset,
rktp->rktp_offsets.fetch_offset);
rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
/* New start offset */
rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
rktp->rktp_last_next_offset = rktp->rktp_next_offset;
rktp->rktp_fetch_version = version;
rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
version);
}
if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
should_fetch = 0;
reason = "paused";
} else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
should_fetch = 0;
reason = "no concrete offset";
} else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
rkb->rkb_rk->rk_conf.queued_min_msgs) {
/* Skip toppars who's local message queue is already above
* the lower threshold. */
reason = "queued.min.messages exceeded";
should_fetch = 0;
} else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
reason = "queued.max.messages.kbytes exceeded";
should_fetch = 0;
} else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
reason = "fetch backed off";
ts_backoff = rktp->rktp_ts_fetch_backoff;
should_fetch = 0;
}
done:
/* Copy offset stats to finalized place holder. */
rktp->rktp_offsets_fin = rktp->rktp_offsets;
if (rktp->rktp_fetch != should_fetch) {
rd_rkb_dbg(rkb, FETCH, "FETCH",
"Topic %s [%"PRId32"] in state %s at offset %s "
"(%d/%d msgs, %"PRId64"/%d kb queued, "
"opv %"PRId32") is %sfetchable: %s",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->rktp_fetch_state],
rd_kafka_offset2str(rktp->rktp_next_offset),
rd_kafka_q_len(rktp->rktp_fetchq),
rkb->rkb_rk->rk_conf.queued_min_msgs,
rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
rktp->rktp_fetch_version,
should_fetch ? "" : "not ", reason);
if (should_fetch) {
rd_dassert(rktp->rktp_fetch_version > 0);
rd_kafka_broker_fetch_toppar_add(rkb, rktp);
} else {
rd_kafka_broker_fetch_toppar_del(rkb, rktp);
/* Non-fetching partitions will have an
* indefinate backoff, unless explicitly specified. */
if (!ts_backoff)
ts_backoff = RD_TS_MAX;
}
}
rd_kafka_toppar_unlock(rktp);
return ts_backoff;
}
/**
* @brief Serve a toppar in a consumer broker thread.
* This is considered the fast path and should be minimal,
* mostly focusing on fetch related mechanisms.
*
* @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
*
* @locality broker thread
* @locks none
*/
rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp) {
return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
}
/**
* Serve a toppar op
* 'rktp' may be NULL for certain ops (OP_RECV_BUF)
*
* @locality toppar handler thread
*/
static rd_kafka_op_res_t
rd_kafka_toppar_op_serve (rd_kafka_t *rk,
rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque) {
rd_kafka_toppar_t *rktp = NULL;
int outdated = 0;
if (rko->rko_rktp)
rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
if (rktp) {
outdated = rd_kafka_op_version_outdated(rko,
rktp->rktp_op_version);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
"%.*s [%"PRId32"] received %sop %s "
"(v%"PRId32") in fetch-state %s (opv%d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
outdated ? "outdated ": "",
rd_kafka_op2str(rko->rko_type),
rko->rko_version,
rd_kafka_fetch_states[rktp->rktp_fetch_state],
rktp->rktp_op_version);
if (outdated) {
#if ENABLE_DEVEL
rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
#endif
rd_kafka_op_destroy(rko);
return RD_KAFKA_OP_RES_HANDLED;
}
}
switch ((int)rko->rko_type)
{
case RD_KAFKA_OP_FETCH_START:
rd_kafka_toppar_fetch_start(rktp,
rko->rko_u.fetch_start.offset, rko);
break;
case RD_KAFKA_OP_FETCH_STOP:
rd_kafka_toppar_fetch_stop(rktp, rko);
break;
case RD_KAFKA_OP_SEEK:
rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
break;
case RD_KAFKA_OP_PAUSE:
rd_kafka_toppar_pause_resume(rktp, rko);
break;
case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
rko->rko_u.offset_commit.cb(
rk, rko->rko_err,
rko->rko_u.offset_commit.partitions,
rko->rko_u.offset_commit.opaque);
break;
case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
{
/* OffsetFetch reply */
rd_kafka_topic_partition_list_t *offsets =
rko->rko_u.offset_fetch.partitions;
shptr_rd_kafka_toppar_t *s_rktp;
int64_t offset = RD_KAFKA_OFFSET_INVALID;
s_rktp = offsets->elems[0]._private;
if (!rko->rko_err) {
/* Request succeeded but per-partition might have failed */
rko->rko_err = offsets->elems[0].err;
offset = offsets->elems[0].offset;
}
offsets->elems[0]._private = NULL;
rd_kafka_topic_partition_list_destroy(offsets);
rko->rko_u.offset_fetch.partitions = NULL;
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
1/*lock*/);
rd_kafka_toppar_lock(rktp);
if (rko->rko_err) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
TOPIC, "OFFSET",
"Failed to fetch offset for "
"%.*s [%"PRId32"]: %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_err2str(rko->rko_err));
/* Keep on querying until we succeed. */
rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
rd_kafka_toppar_unlock(rktp);
rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
&rktp->rktp_offset_query_tmr,
500*1000,
rd_kafka_offset_query_tmr_cb,
rktp);
/* Propagate error to application */
if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD) {
rd_kafka_q_op_err(rktp->rktp_fetchq,
RD_KAFKA_OP_ERR, rko->rko_err,
0, rktp, 0,
"Failed to fetch "
"offsets from brokers: %s",
rd_kafka_err2str(rko->rko_err));
}
rd_kafka_toppar_destroy(s_rktp);
break;
}
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
TOPIC, "OFFSET",
"%.*s [%"PRId32"]: OffsetFetch returned "
"offset %s (%"PRId64")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_offset2str(offset), offset);
if (offset > 0)
rktp->rktp_committed_offset = offset;
if (offset >= 0)
rd_kafka_toppar_next_offset_handle(rktp, offset);
else
rd_kafka_offset_reset(rktp, offset,
RD_KAFKA_RESP_ERR__NO_OFFSET,
"no previously committed offset "
"available");
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
break;
default:
rd_kafka_assert(NULL, !*"unknown type");
break;
}
rd_kafka_op_destroy(rko);
return RD_KAFKA_OP_RES_HANDLED;
}
/**
* Send command op to toppar (handled by toppar's thread).
*
* Locality: any thread
*/
static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
rd_kafka_replyq_t replyq) {
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_replyq = replyq;
rd_kafka_q_enq(rktp->rktp_ops, rko);
}
/**
* Send command op to toppar (handled by toppar's thread).
*
* Locality: any thread
*/
static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
rd_kafka_op_type_t type, int32_t version,
int64_t offset, rd_kafka_cgrp_t *rkcg,
rd_kafka_replyq_t replyq) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(type);
rko->rko_version = version;
if (type == RD_KAFKA_OP_FETCH_START ||
type == RD_KAFKA_OP_SEEK) {
if (rkcg)
rko->rko_u.fetch_start.rkcg = rkcg;
rko->rko_u.fetch_start.offset = offset;
}
rd_kafka_toppar_op0(rktp, rko, replyq);
}
/**
* Start consuming partition (async operation).
* 'offset' is the initial offset
* 'fwdq' is an optional queue to forward messages to, if this is NULL
* then messages will be enqueued on rktp_fetchq.
* 'replyq' is an optional queue for handling the consume_start ack.
*
* This is the thread-safe interface that can be called from any thread.
*/
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
int64_t offset,
rd_kafka_q_t *fwdq,
rd_kafka_replyq_t replyq) {
int32_t version;
rd_kafka_q_lock(rktp->rktp_fetchq);
if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
0, /* no do_lock */
0 /* no fwd_app */);
rd_kafka_q_unlock(rktp->rktp_fetchq);
/* Bump version barrier. */
version = rd_kafka_toppar_version_new_barrier(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
"Start consuming %.*s [%"PRId32"] at "
"offset %s (v%"PRId32")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rd_kafka_offset2str(offset),
version);
rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Stop consuming partition (async operatoin)
* This is thread-safe interface that can be called from any thread.
*
* Locality: any thread
*/
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
rd_kafka_replyq_t replyq) {
int32_t version;
/* Bump version barrier. */
version = rd_kafka_toppar_version_new_barrier(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
"Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, version);
rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
0, NULL, replyq);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Set/Seek offset of a consumed partition (async operation).
* 'offset' is the target offset
* 'replyq' is an optional queue for handling the ack.
*
* This is the thread-safe interface that can be called from any thread.
*/
rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
int64_t offset,
rd_kafka_replyq_t replyq) {
int32_t version;
/* Bump version barrier. */
version = rd_kafka_toppar_version_new_barrier(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
"Seek %.*s [%"PRId32"] to "
"offset %s (v%"PRId32")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, rd_kafka_offset2str(offset),
version);
rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
offset, NULL, replyq);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Pause/resume partition (async operation).
* \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
* depending on if the app paused or librdkafka.
* \p pause is 1 for pausing or 0 for resuming.
*
* Locality: any
*/
static rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp,
int pause, int flag) {
int32_t version;
rd_kafka_op_t *rko;
/* Bump version barrier. */
version = rd_kafka_toppar_version_new_barrier(rktp);
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
"%s %.*s [%"PRId32"] (v%"PRId32")",
pause ? "Pause" : "Resume",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, version);
rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
rko->rko_version = version;
rko->rko_u.pause.pause = pause;
rko->rko_u.pause.flag = flag;
rd_kafka_toppar_op0(rktp, rko, RD_KAFKA_NO_REPLYQ);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Pause or resume a list of partitions.
* \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
* depending on if the app paused or librdkafka.
* \p pause is 1 for pausing or 0 for resuming.
*
* Locality: any
*
* @remark This is an asynchronous call, the actual pause/resume is performed
* by toppar_pause() in the toppar's handler thread.
*/
rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
rd_kafka_topic_partition_list_t *partitions) {
int i;
rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
"%s %s %d partition(s)",
flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
pause ? "pausing" : "resuming", partitions->cnt);
for (i = 0 ; i < partitions->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar);
if (!s_rktp) {
rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
"%s %s [%"PRId32"]: skipped: "
"unknown partition",
pause ? "Pause":"Resume",
rktpar->topic, rktpar->partition);
rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
continue;
}
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_op_pause_resume(rktp, pause, flag);
rd_kafka_toppar_destroy(s_rktp);
rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Propagate error for toppar
*/
void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
rko->rko_err = err;
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_u.err.errstr = rd_strdup(rd_kafka_err2str(rko->rko_err));
rd_kafka_q_enq(rktp->rktp_fetchq, rko);
}
/**
* Returns the local leader broker for this toppar.
* If \p proper_broker is set NULL will be returned if current handler
* is not a proper broker (INTERNAL broker).
*
* The returned broker has an increased refcount.
*
* Locks: none
*/
rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp,
int proper_broker) {
rd_kafka_broker_t *rkb;
rd_kafka_toppar_lock(rktp);
rkb = rktp->rktp_leader;
if (rkb) {
if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
rkb = NULL;
else
rd_kafka_broker_keep(rkb);
}
rd_kafka_toppar_unlock(rktp);
return rkb;
}
/**
* @brief Take action when partition leader becomes unavailable.
* This should be called when leader-specific requests fail with
* NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
*
* @locks none
* @locality any
*/
void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
const char *reason,
rd_kafka_resp_err_t err) {
rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "LEADERUA",
"%s [%"PRId32"]: leader unavailable: %s: %s",
rkt->rkt_topic->str, rktp->rktp_partition, reason,
rd_kafka_err2str(err));
rd_kafka_topic_wrlock(rkt);
rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
rd_kafka_topic_wrunlock(rkt);
rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
}
const char *
rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
return rktp->rktp_rkt->rkt_topic->str;
}
int32_t
rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
return rktp->rktp_partition;
}
void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
const char **name, int32_t *partition) {
const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
*name = rktp->rktp_rkt->rkt_topic->str;
*partition = rktp->rktp_partition;
}
/**
*
* rd_kafka_topic_partition_t lists
* Fixed-size non-growable list of partitions for propagation to application.
*
*/
static void
rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
int add_size) {
if (add_size < rktparlist->size)
add_size = RD_MAX(rktparlist->size, 32);
rktparlist->size += add_size;
rktparlist->elems = rd_realloc(rktparlist->elems,
sizeof(*rktparlist->elems) *
rktparlist->size);
}
/**
* Create a list for fitting 'size' topic_partitions (rktp).
*/
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
rd_kafka_topic_partition_list_t *rktparlist;
rktparlist = rd_calloc(1, sizeof(*rktparlist));
rktparlist->size = size;
rktparlist->cnt = 0;
if (size > 0)
rd_kafka_topic_partition_list_grow(rktparlist, size);
return rktparlist;
}
rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
int32_t partition) {
rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
rktpar->topic = rd_strdup(topic);
rktpar->partition = partition;
return rktpar;
}
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));
rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
rktpar->partition = rktp->rktp_partition;
return rktpar;
}
static void
rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
if (rktpar->topic)
rd_free(rktpar->topic);
if (rktpar->metadata)
rd_free(rktpar->metadata);
if (rktpar->_private)
rd_kafka_toppar_destroy((shptr_rd_kafka_toppar_t *)
rktpar->_private);
if (do_free)
rd_free(rktpar);
}
void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
rd_kafka_topic_partition_destroy0(rktpar, 1);
}
/**
* Destroys a list previously created with .._list_new() and drops
* any references to contained toppars.
*/
void
rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
int i;
for (i = 0 ; i < rktparlist->cnt ; i++)
rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);
if (rktparlist->elems)
rd_free(rktparlist->elems);
rd_free(rktparlist);
}
/**
* Add a partition to an rktpar list.
* The list must have enough room to fit it.
*
* '_private' must be NULL or a valid 'shptr_rd_kafka_toppar_t *'.
*
* Returns a pointer to the added element.
*/
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition,
shptr_rd_kafka_toppar_t *_private) {
rd_kafka_topic_partition_t *rktpar;
if (rktparlist->cnt == rktparlist->size)
rd_kafka_topic_partition_list_grow(rktparlist, 1);
rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);
rktpar = &rktparlist->elems[rktparlist->cnt++];
memset(rktpar, 0, sizeof(*rktpar));
rktpar->topic = rd_strdup(topic);
rktpar->partition = partition;
rktpar->offset = RD_KAFKA_OFFSET_INVALID;
rktpar->_private = _private;
return rktpar;
}
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition) {
return rd_kafka_topic_partition_list_add0(rktparlist,
topic, partition, NULL);
}
/**
* Adds a consecutive list of partitions to a list
*/
void
rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
*rktparlist,
const char *topic,
int32_t start, int32_t stop) {
for (; start <= stop ; start++)
rd_kafka_topic_partition_list_add(rktparlist, topic, start);
}
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_upsert (
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition) {
rd_kafka_topic_partition_t *rktpar;
if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
topic, partition)))
return rktpar;
return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
}
/**
* @brief Creates a copy of \p rktpar and adds it to \p rktparlist
*/
void
rd_kafka_topic_partition_copy (rd_kafka_topic_partition_list_t *rktparlist,
const rd_kafka_topic_partition_t *rktpar) {
rd_kafka_topic_partition_t *dst;
dst = rd_kafka_topic_partition_list_add0(
rktparlist,
rktpar->topic,
rktpar->partition,
rktpar->_private ?
rd_kafka_toppar_keep(
rd_kafka_toppar_s2i((shptr_rd_kafka_toppar_t *)
rktpar->_private)) : NULL);
dst->offset = rktpar->offset;
dst->opaque = rktpar->opaque;
dst->err = rktpar->err;
if (rktpar->metadata_size > 0) {
dst->metadata =
rd_malloc(rktpar->metadata_size);
dst->metadata_size = rktpar->metadata_size;
memcpy((void *)dst->metadata, rktpar->metadata,
rktpar->metadata_size);
}
}
/**
* Create and return a copy of list 'src'
*/
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
rd_kafka_topic_partition_list_t *dst;
int i;
dst = rd_kafka_topic_partition_list_new(src->size);
for (i = 0 ; i < src->cnt ; i++)
rd_kafka_topic_partition_copy(dst, &src->elems[i]);
return dst;
}
/**
* @returns (and sets if necessary) the \p rktpar's _private / toppar.
* @remark a new reference is returned.
*/
shptr_rd_kafka_toppar_t *
rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
rd_kafka_topic_partition_t *rktpar) {
shptr_rd_kafka_toppar_t *s_rktp;
if (!(s_rktp = rktpar->_private))
s_rktp = rktpar->_private =
rd_kafka_toppar_get2(rk,
rktpar->topic,
rktpar->partition, 0, 0);
if (!s_rktp)
return NULL;
return rd_kafka_toppar_keep(rd_kafka_toppar_s2i(s_rktp));
}
static int rd_kafka_topic_partition_cmp (const void *_a, const void *_b,
void *opaque) {
const rd_kafka_topic_partition_t *a = _a;
const rd_kafka_topic_partition_t *b = _b;
int r = strcmp(a->topic, b->topic);
if (r)
return r;
else
return a->partition - b->partition;
}
/**
* @brief Search 'rktparlist' for 'topic' and 'partition'.
* @returns the elems[] index or -1 on miss.
*/
int
rd_kafka_topic_partition_list_find0 (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition) {
rd_kafka_topic_partition_t skel;
int i;
skel.topic = (char *)topic;
skel.partition = partition;
for (i = 0 ; i < rktparlist->cnt ; i++) {
if (!rd_kafka_topic_partition_cmp(&skel,
&rktparlist->elems[i],
NULL))
return i;
}
return -1;
}
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition) {
int i = rd_kafka_topic_partition_list_find0(rktparlist,
topic, partition);
if (i == -1)
return NULL;
else
return &rktparlist->elems[i];
}
int
rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
int idx) {
if (unlikely(idx < 0 || idx >= rktparlist->cnt))
return 0;
rktparlist->cnt--;
rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
(rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx]));
return 1;
}
int
rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition) {
int i = rd_kafka_topic_partition_list_find0(rktparlist,
topic, partition);
if (i == -1)
return 0;
return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
}
/**
* Returns true if 'topic' matches the 'rktpar', else false.
* On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
*/
int rd_kafka_topic_partition_match (rd_kafka_t *rk,
const rd_kafka_group_member_t *rkgm,
const rd_kafka_topic_partition_t *rktpar,
const char *topic, int *matched_by_regex) {
int ret = 0;
if (*rktpar->topic == '^') {
char errstr[128];
ret = rd_regex_match(rktpar->topic, topic,
errstr, sizeof(errstr));
if (ret == -1) {
rd_kafka_dbg(rk, CGRP,
"SUBMATCH",
"Invalid regex for member "
"\"%.*s\" subscription \"%s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rktpar->topic, errstr);
return 0;
}
if (ret && matched_by_regex)
*matched_by_regex = 1;
} else if (!strcmp(rktpar->topic, topic)) {
if (matched_by_regex)
*matched_by_regex = 0;
ret = 1;
}
return ret;
}
void rd_kafka_topic_partition_list_sort (
rd_kafka_topic_partition_list_t *rktparlist,
int (*cmp) (const void *, const void *, void *),
void *opaque) {
if (!cmp)
cmp = rd_kafka_topic_partition_cmp;
rd_qsort_r(rktparlist->elems, rktparlist->cnt,
sizeof(*rktparlist->elems),
cmp, opaque);
}
void rd_kafka_topic_partition_list_sort_by_topic (
rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_topic_partition_list_sort(rktparlist,
rd_kafka_topic_partition_cmp, NULL);
}
rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic, int32_t partition, int64_t offset) {
rd_kafka_topic_partition_t *rktpar;
if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
topic, partition)))
return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
rktpar->offset = offset;
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief Reset all offsets to the provided value.
*/
void
rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
int64_t offset) {
int i;
for (i = 0 ; i < rktparlist->cnt ; i++)
rktparlist->elems[i].offset = offset;
}
/**
* Set offset values in partition list based on toppar's last stored offset.
*
* from_rktp - true: set rktp's last stored offset, false: set def_value
* unless a concrete offset is set.
* is_commit: indicates that set offset is to be committed (for debug log)
*
* Returns the number of valid non-logical offsets (>=0).
*/
int rd_kafka_topic_partition_list_set_offsets (
rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *rktparlist,
int from_rktp, int64_t def_value, int is_commit) {
int i;
int valid_cnt = 0;
for (i = 0 ; i < rktparlist->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
const char *verb = "setting";
if (from_rktp) {
shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
"Topic %s [%"PRId32"]: "
"stored offset %"PRId64", committed "
"offset %"PRId64,
rktpar->topic, rktpar->partition,
rktp->rktp_stored_offset,
rktp->rktp_committed_offset);
if (rktp->rktp_stored_offset >
rktp->rktp_committed_offset) {
verb = "setting stored";
rktpar->offset = rktp->rktp_stored_offset;
} else {
rktpar->offset = RD_KAFKA_OFFSET_INVALID;
}
rd_kafka_toppar_unlock(rktp);
} else {
if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
verb = "setting default";
rktpar->offset = def_value;
} else
verb = "keeping";
}
rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
"Topic %s [%"PRId32"]: "
"%s offset %s%s",
rktpar->topic, rktpar->partition,
verb,
rd_kafka_offset2str(rktpar->offset),
is_commit ? " for commit" : "");
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
valid_cnt++;
}
return valid_cnt;
}
/**
* @returns the number of partitions with absolute (non-logical) offsets set.
*/
int rd_kafka_topic_partition_list_count_abs_offsets (
const rd_kafka_topic_partition_list_t *rktparlist) {
int i;
int valid_cnt = 0;
for (i = 0 ; i < rktparlist->cnt ; i++)
if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
valid_cnt++;
return valid_cnt;
}
/**
* @returns a new shared toppar pointer for partition at index 'idx',
* or NULL if not set, not found, or out of range.
*
* @remark A new reference is returned.
* @remark The _private field is set to the toppar it not previously set.
*/
shptr_rd_kafka_toppar_t *
rd_kafka_topic_partition_list_get_toppar (
rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar) {
shptr_rd_kafka_toppar_t *s_rktp;
s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
if (!s_rktp)
return NULL;
return s_rktp;
}
/**
* @brief Update _private (toppar) field to point to valid s_rktp
* for each parition.
*/
void
rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t
*rktparlist) {
int i;
for (i = 0 ; i < rktparlist->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
rd_kafka_topic_partition_list_get_toppar(rk, rktpar);
}
}
/**
* @brief Populate \p leaders with the leaders+partitions for the partitions in
* \p rktparlist. Duplicates are suppressed.
*
* If no leader is found for a partition that element's \c .err will
* be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
*
* If the partition does not exist \c .err will be set to
* RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
*
* @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
* @param query_topics (optional) rd_list of strdupped (char *)
*
* @remark This is based on the current topic_t and partition state
* which may lag behind the last metadata update due to internal
* threading and also the fact that no topic_t may have been created.
*
* @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
*
* @returns the number of leaders added.
*
* @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
*
* @locks rd_kafka_*lock() MUST NOT be held
*/
int
rd_kafka_topic_partition_list_get_leaders (
rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *rktparlist,
rd_list_t *leaders,
rd_list_t *query_topics) {
int cnt = 0;
int i;
rd_kafka_rdlock(rk);
for (i = 0 ; i < rktparlist->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
rd_kafka_broker_t *rkb = NULL;
struct rd_kafka_partition_leader leader_skel;
struct rd_kafka_partition_leader *leader;
const rd_kafka_metadata_topic_t *mtopic;
const rd_kafka_metadata_partition_t *mpart;
rd_kafka_metadata_cache_topic_partition_get(
rk, &mtopic, &mpart,
rktpar->topic, rktpar->partition, 1/*valid*/);
if (mtopic &&
mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
/* Topic permanently errored */
rktpar->err = mtopic->err;
continue;
}
if (mtopic && !mpart && mtopic->partition_cnt > 0) {
/* Topic exists but partition doesnt.
* This is a permanent error. */
rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
continue;
}
if (mpart &&
(mpart->leader == -1 ||
!(rkb = rd_kafka_broker_find_by_nodeid0(
rk, mpart->leader, -1/*any state*/)))) {
/* Partition has no (valid) leader */
rktpar->err =
mtopic->err ? mtopic->err :
RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
}
if (!mtopic || !rkb) {
/* Topic unknown or no current leader for partition,
* add topic to query list. */
if (query_topics &&
!rd_list_find(query_topics, rktpar->topic,
(void *)strcmp))
rd_list_add(query_topics,
rd_strdup(rktpar->topic));
continue;
}
/* Leader exists, add to leader list. */
rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
memset(&leader_skel, 0, sizeof(leader_skel));
leader_skel.rkb = rkb;
leader = rd_list_find(leaders, &leader_skel,
rd_kafka_partition_leader_cmp);
if (!leader) {
leader = rd_kafka_partition_leader_new(rkb);
rd_list_add(leaders, leader);
cnt++;
}
rd_kafka_topic_partition_copy(leader->partitions, rktpar);
rd_kafka_broker_destroy(rkb); /* loose refcount */
}
rd_kafka_rdunlock(rk);
return cnt;
}
/**
* @brief Get leaders for all partitions in \p rktparlist, querying metadata
* if needed.
*
* @param leaders is a pre-initialized (empty) list which will be populated
* with the leader brokers and their partitions
* (struct rd_kafka_partition_leader *)
*
* @returns an error code on error.
*
* @locks rd_kafka_*lock() MUST NOT be held
*/
rd_kafka_resp_err_t
rd_kafka_topic_partition_list_query_leaders (
rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *rktparlist,
rd_list_t *leaders, int timeout_ms) {
rd_ts_t ts_end = rd_timeout_init(timeout_ms);
rd_ts_t ts_query = 0;
rd_ts_t now;
int i = 0;
/* Get all the partition leaders, try multiple times:
* if there are no leaders after the first run fire off a leader
* query and wait for broker state update before trying again,
* keep trying and re-querying at increasing intervals until
* success or timeout. */
do {
rd_list_t query_topics;
int query_intvl;
rd_list_init(&query_topics, rktparlist->cnt, rd_free);
rd_kafka_topic_partition_list_get_leaders(
rk, rktparlist, leaders, &query_topics);
if (rd_list_empty(&query_topics)) {
/* No remaining topics to query: leader-list complete.*/
rd_list_destroy(&query_topics);
/* No leader(s) for partitions means all partitions
* are unknown. */
if (rd_list_empty(leaders))
return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
now = rd_clock();
/*
* Missing leader for some partitions
*/
query_intvl = (i+1) * 100; /* add 100ms per iteration */
if (query_intvl > 2*1000)
query_intvl = 2*1000; /* Cap to 2s */
if (now >= ts_query + (query_intvl*1000)) {
/* Query metadata for missing leaders,
* possibly creating the topic. */
rd_kafka_metadata_refresh_topics(
rk, NULL, &query_topics, 1/*force*/,
"query partition leaders");
ts_query = now;
} else {
/* Wait for broker ids to be updated from
* metadata refresh above. */
int wait_ms = rd_timeout_remains(ts_end);
if (query_intvl < wait_ms)
wait_ms = query_intvl;
rd_kafka_metadata_cache_wait_change(rk, query_intvl);
}
rd_list_destroy(&query_topics);
i++;
} while (now < ts_end); /* now is deliberately outdated here
* since wait_change() will block.
* This gives us one more chance to spin thru*/
return RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
}
/**
* @brief Populate \p rkts with the rd_kafka_itopic_t objects for the
* partitions in. Duplicates are suppressed.
*
* @returns the number of topics added.
*/
int
rd_kafka_topic_partition_list_get_topics (
rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *rktparlist,
rd_list_t *rkts) {
int cnt = 0;
int i;
for (i = 0 ; i < rktparlist->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
if (!s_rktp) {
rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
continue;
}
rktp = rd_kafka_toppar_s2i(s_rktp);
if (!rd_list_find(rkts, rktp->rktp_s_rkt,
rd_kafka_topic_cmp_s_rkt)) {
rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
cnt++;
}
rd_kafka_toppar_destroy(s_rktp);
}
return cnt;
}
/**
* @brief Populate \p topics with the strdupped topic names in \p rktparlist.
* Duplicates are suppressed.
*
* @param include_regex: include regex topics
*
* @returns the number of topics added.
*/
int
rd_kafka_topic_partition_list_get_topic_names (
const rd_kafka_topic_partition_list_t *rktparlist,
rd_list_t *topics, int include_regex) {
int cnt = 0;
int i;
for (i = 0 ; i < rktparlist->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
if (!include_regex && *rktpar->topic == '^')
continue;
if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
rd_list_add(topics, rd_strdup(rktpar->topic));
cnt++;
}
}
return cnt;
}
/**
* @brief Create a copy of \p rktparlist only containing the partitions
* matched by \p match function.
*
* \p match shall return 1 for match, else 0.
*
* @returns a new list
*/
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
const rd_kafka_topic_partition_list_t *rktparlist,
int (*match) (const void *elem, const void *opaque),
void *opaque) {
rd_kafka_topic_partition_list_t *newlist;
int i;
newlist = rd_kafka_topic_partition_list_new(0);
for (i = 0 ; i < rktparlist->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar =
&rktparlist->elems[i];
if (!match(rktpar, opaque))
continue;
rd_kafka_topic_partition_copy(newlist, rktpar);
}
return newlist;
}
void
rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac,
const rd_kafka_topic_partition_list_t *rktparlist) {
int i;
rd_kafka_dbg(rk, TOPIC, fac, "List with %d partition(s):",
rktparlist->cnt);
for (i = 0 ; i < rktparlist->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar =
&rktparlist->elems[i];
rd_kafka_dbg(rk, TOPIC, fac, " %s [%"PRId32"] offset %s%s%s",
rktpar->topic, rktpar->partition,
rd_kafka_offset2str(rktpar->offset),
rktpar->err ? ": error: " : "",
rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
}
}
/**
* @returns a comma-separated list of partitions.
*/
const char *
rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
char *dest, size_t dest_size,
int fmt_flags) {
int i;
size_t of = 0;
int trunc = 0;
for (i = 0 ; i < rktparlist->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar =
&rktparlist->elems[i];
char errstr[128];
char offsetstr[32];
int r;
if (trunc) {
if (dest_size > 3)
rd_snprintf(&dest[dest_size-3], 3, "...");
break;
}
if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
continue;
if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
rd_snprintf(errstr, sizeof(errstr),
"(%s)", rd_kafka_err2str(rktpar->err));
else
errstr[0] = '\0';
if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
rd_snprintf(offsetstr, sizeof(offsetstr),
"@%"PRId64, rktpar->offset);
else
offsetstr[0] = '\0';
r = rd_snprintf(&dest[of], dest_size-of,
"%s"
"%s[%"PRId32"]"
"%s"
"%s",
of == 0 ? "" : ", ",
rktpar->topic, rktpar->partition,
offsetstr,
errstr);
if ((size_t)r >= dest_size-of)
trunc++;
else
of += r;
}
return dest;
}
/**
* @brief Update \p dst with info from \p src.
*
* Fields updated:
* - offset
* - err
*
* Will only partitions that are in both dst and src, other partitions will
* remain unchanged.
*/
void
rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
const rd_kafka_topic_partition_list_t *src){
int i;
for (i = 0 ; i < dst->cnt ; i++) {
rd_kafka_topic_partition_t *d = &dst->elems[i];
rd_kafka_topic_partition_t *s;
if (!(s = rd_kafka_topic_partition_list_find(
(rd_kafka_topic_partition_list_t *)src,
d->topic, d->partition)))
continue;
d->offset = s->offset;
d->err = s->err;
}
}
/**
* @returns the sum of \p cb called for each element.
*/
size_t
rd_kafka_topic_partition_list_sum (
const rd_kafka_topic_partition_list_t *rktparlist,
size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
void *opaque) {
int i;
size_t sum = 0;
for (i = 0 ; i < rktparlist->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar =
&rktparlist->elems[i];
sum += cb(rktpar, opaque);
}
return sum;
}
/**
* @brief Set \c .err field \p err on all partitions in list.
*/
void rd_kafka_topic_partition_list_set_err (
rd_kafka_topic_partition_list_t *rktparlist,
rd_kafka_resp_err_t err) {
int i;
for (i = 0 ; i < rktparlist->cnt ; i++)
rktparlist->elems[i].err = err;
}
/**
* @returns the number of wildcard/regex topics
*/
int rd_kafka_topic_partition_list_regex_cnt (
const rd_kafka_topic_partition_list_t *rktparlist) {
int i;
int cnt = 0;
for (i = 0 ; i < rktparlist->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar =
&rktparlist->elems[i];
cnt += *rktpar->topic == '^';
}
return cnt;
}