blob: 4f052ad221d7b06383f9e8db86e7ddb317b62222 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rdkafka_int.h"
#include "rdkafka_broker.h"
#include "rdkafka_request.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_assignor.h"
#include "rdkafka_offset.h"
#include "rdkafka_metadata.h"
#include "rdkafka_cgrp.h"
#include "rdkafka_interceptor.h"
static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
const char *reason);
static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
void *arg);
static void rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *assignment);
static rd_kafka_resp_err_t rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg);
static void
rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t
*assignment, int usable_offsets,
int line);
#define rd_kafka_cgrp_partitions_fetch_start(rkcg,assignment,usable_offsets) \
rd_kafka_cgrp_partitions_fetch_start0(rkcg,assignment,usable_offsets,\
__LINE__)
static rd_kafka_op_res_t
rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
void *opaque);
static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
const char *reason);
/**
* @returns true if cgrp can start partition fetchers, which is true if
* there is a subscription and the group is fully joined, or there
* is no subscription (in which case the join state is irrelevant)
* such as for an assign() without subscribe(). */
#define RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) \
((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED)
/**
* @returns true if cgrp is waiting for a rebalance_cb to be handled by
* the application.
*/
#define RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) \
((rkcg)->rkcg_join_state == \
RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB || \
(rkcg)->rkcg_join_state == \
RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
const char *rd_kafka_cgrp_state_names[] = {
"init",
"term",
"query-coord",
"wait-coord",
"wait-broker",
"wait-broker-transport",
"up"
};
const char *rd_kafka_cgrp_join_state_names[] = {
"init",
"wait-join",
"wait-metadata",
"wait-sync",
"wait-unassign",
"wait-assign-rebalance_cb",
"wait-revoke-rebalance_cb",
"assigned",
"started"
};
static void rd_kafka_cgrp_set_state (rd_kafka_cgrp_t *rkcg, int state) {
if ((int)rkcg->rkcg_state == state)
return;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
"Group \"%.*s\" changed state %s -> %s "
"(v%d, join-state %s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_state_names[state],
rkcg->rkcg_version,
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
rkcg->rkcg_state = state;
rkcg->rkcg_ts_statechange = rd_clock();
rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
}
void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state) {
if ((int)rkcg->rkcg_join_state == join_state)
return;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
"Group \"%.*s\" changed join state %s -> %s "
"(v%d, state %s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rd_kafka_cgrp_join_state_names[join_state],
rkcg->rkcg_version,
rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
rkcg->rkcg_join_state = join_state;
}
static RD_INLINE void
rd_kafka_cgrp_version_new_barrier0 (rd_kafka_cgrp_t *rkcg,
const char *func, int line) {
rkcg->rkcg_version++;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BARRIER",
"Group \"%.*s\": %s:%d: new version barrier v%d",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), func, line,
rkcg->rkcg_version);
}
#define rd_kafka_cgrp_version_new_barrier(rkcg) \
rd_kafka_cgrp_version_new_barrier0(rkcg, __FUNCTION__, __LINE__)
void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg) {
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_assignment);
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
rd_kafka_cgrp_set_member_id(rkcg, NULL);
rd_kafka_q_destroy(rkcg->rkcg_q);
rd_kafka_q_destroy(rkcg->rkcg_ops);
rd_kafka_q_destroy(rkcg->rkcg_wait_coord_q);
rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
rd_list_destroy(&rkcg->rkcg_toppars);
rd_list_destroy(rkcg->rkcg_subscribed_topics);
rd_free(rkcg);
}
rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
const rd_kafkap_str_t *group_id,
const rd_kafkap_str_t *client_id) {
rd_kafka_cgrp_t *rkcg;
rkcg = rd_calloc(1, sizeof(*rkcg));
rkcg->rkcg_rk = rk;
rkcg->rkcg_group_id = group_id;
rkcg->rkcg_client_id = client_id;
rkcg->rkcg_coord_id = -1;
rkcg->rkcg_generation_id = -1;
rkcg->rkcg_version = 1;
mtx_init(&rkcg->rkcg_lock, mtx_plain);
rkcg->rkcg_ops = rd_kafka_q_new(rk);
rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
rkcg->rkcg_ops->rkq_opaque = rkcg;
rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
rkcg->rkcg_q = rd_kafka_q_new(rk);
TAILQ_INIT(&rkcg->rkcg_topics);
rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
rd_kafka_cgrp_set_member_id(rkcg, "");
rkcg->rkcg_subscribed_topics =
rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
rd_interval_init(&rkcg->rkcg_coord_query_intvl);
rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
rd_interval_init(&rkcg->rkcg_join_intvl);
rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
if (RD_KAFKAP_STR_IS_NULL(group_id)) {
/* No group configured: Operate in legacy/SimpleConsumer mode */
rd_kafka_simple_consumer_add(rk);
/* no need look up group coordinator (no queries) */
rd_interval_disable(&rkcg->rkcg_coord_query_intvl);
}
if (rk->rk_conf.enable_auto_commit &&
rk->rk_conf.auto_commit_interval_ms > 0)
rd_kafka_timer_start(&rk->rk_timers,
&rkcg->rkcg_offset_commit_tmr,
rk->rk_conf.
auto_commit_interval_ms * 1000ll,
rd_kafka_cgrp_offset_commit_tmr_cb,
rkcg);
return rkcg;
}
/**
* Select a broker to handle this cgrp.
* It will prefer the coordinator broker but if that is not available
* any other broker that is Up will be used, and if that also fails
* uses the internal broker handle.
*
* NOTE: The returned rkb will have had its refcnt increased.
*/
static rd_kafka_broker_t *rd_kafka_cgrp_select_broker (rd_kafka_cgrp_t *rkcg) {
rd_kafka_broker_t *rkb = NULL;
/* No need for a managing broker when cgrp is terminated */
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
return NULL;
rd_kafka_rdlock(rkcg->rkcg_rk);
/* Try to find the coordinator broker, if it isn't found
* move the cgrp to any other Up broker which will
* do further coord querying while waiting for the
* proper broker to materialise.
* If that also fails, go with the internal broker */
if (rkcg->rkcg_coord_id != -1)
rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk,
rkcg->rkcg_coord_id);
if (!rkb)
rkb = rd_kafka_broker_prefer(rkcg->rkcg_rk,
rkcg->rkcg_coord_id,
RD_KAFKA_BROKER_STATE_UP);
if (!rkb)
rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
rd_kafka_rdunlock(rkcg->rkcg_rk);
/* Dont change managing broker unless warranted.
* This means do not change to another non-coordinator broker
* while we are waiting for the proper coordinator broker to
* become available. */
if (rkb && rkcg->rkcg_rkb && rkb != rkcg->rkcg_rkb) {
int old_is_coord, new_is_coord;
rd_kafka_broker_lock(rkb);
new_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb);
rd_kafka_broker_unlock(rkb);
rd_kafka_broker_lock(rkcg->rkcg_rkb);
old_is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,
rkcg->rkcg_rkb);
rd_kafka_broker_unlock(rkcg->rkcg_rkb);
if (!old_is_coord && !new_is_coord &&
rkcg->rkcg_rkb->rkb_source != RD_KAFKA_INTERNAL) {
rd_kafka_broker_destroy(rkb);
rkb = rkcg->rkcg_rkb;
rd_kafka_broker_keep(rkb);
}
}
return rkb;
}
/**
* Assign cgrp to broker.
*
* Locality: rdkafka main thread
*/
static void rd_kafka_cgrp_assign_broker (rd_kafka_cgrp_t *rkcg,
rd_kafka_broker_t *rkb) {
rd_kafka_assert(NULL, rkcg->rkcg_rkb == NULL);
rkcg->rkcg_rkb = rkb;
rd_kafka_broker_keep(rkb);
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKASSIGN",
"Group \"%.*s\" management assigned to broker %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_broker_name(rkb));
/* Reset query interval to trigger an immediate
* coord query if required */
if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
if (RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb))
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
}
/**
* Unassign cgrp from current broker.
*
* Locality: main thread
*/
static void rd_kafka_cgrp_unassign_broker (rd_kafka_cgrp_t *rkcg) {
rd_kafka_broker_t *rkb = rkcg->rkcg_rkb;
rd_kafka_assert(NULL, rkcg->rkcg_rkb);
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKUNASSIGN",
"Group \"%.*s\" management unassigned "
"from broker handle %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_broker_name(rkb));
rkcg->rkcg_rkb = NULL;
rd_kafka_broker_destroy(rkb); /* from assign() */
}
/**
* Assign cgrp to a broker to handle.
* It will prefer the coordinator broker but if that is not available
* any other broker that is Up will be used, and if that also fails
* uses the internal broker handle.
*
* Returns 1 if the cgrp was reassigned, else 0.
*/
int rd_kafka_cgrp_reassign_broker (rd_kafka_cgrp_t *rkcg) {
rd_kafka_broker_t *rkb;
rkb = rd_kafka_cgrp_select_broker(rkcg);
if (rkb == rkcg->rkcg_rkb) {
int is_coord = 0;
if (rkb) {
rd_kafka_broker_lock(rkb);
is_coord = RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb);
rd_kafka_broker_unlock(rkb);
}
if (is_coord)
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
else
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
if (rkb)
rd_kafka_broker_destroy(rkb);
return 0; /* No change */
}
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "BRKREASSIGN",
"Group \"%.*s\" management reassigned from "
"broker %s to %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rkcg->rkcg_rkb ?
rd_kafka_broker_name(rkcg->rkcg_rkb) : "(none)",
rkb ? rd_kafka_broker_name(rkb) : "(none)");
if (rkcg->rkcg_rkb)
rd_kafka_cgrp_unassign_broker(rkcg);
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
if (rkb) {
rd_kafka_cgrp_assign_broker(rkcg, rkb);
rd_kafka_broker_destroy(rkb); /* from select_broker() */
}
return 1;
}
/**
* Update the cgrp's coordinator and move it to that broker.
*/
void rd_kafka_cgrp_coord_update (rd_kafka_cgrp_t *rkcg, int32_t coord_id) {
if (rkcg->rkcg_coord_id == coord_id) {
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_WAIT_COORD)
rd_kafka_cgrp_set_state(rkcg,
RD_KAFKA_CGRP_STATE_WAIT_BROKER);
return;
}
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
"Group \"%.*s\" changing coordinator %"PRId32" -> %"PRId32,
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id,
coord_id);
rkcg->rkcg_coord_id = coord_id;
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
rd_kafka_cgrp_reassign_broker(rkcg);
}
/**
* Handle GroupCoordinator response
*/
static void rd_kafka_cgrp_handle_GroupCoordinator (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int32_t CoordId;
rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
int32_t CoordPort;
rd_kafka_cgrp_t *rkcg = opaque;
struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
if (likely(!(ErrorCode = err))) {
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
rd_kafka_buf_read_i32(rkbuf, &CoordId);
rd_kafka_buf_read_str(rkbuf, &CoordHost);
rd_kafka_buf_read_i32(rkbuf, &CoordPort);
}
if (ErrorCode)
goto err2;
mdb.id = CoordId;
RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
mdb.port = CoordPort;
rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
"Group \"%.*s\" coordinator is %s:%i id %"PRId32,
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
mdb.host, mdb.port, mdb.id);
rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb);
rd_kafka_cgrp_coord_update(rkcg, CoordId);
rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
return;
err_parse: /* Parse error */
ErrorCode = rkbuf->rkbuf_err;
/* FALLTHRU */
err2:
rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
"Group \"%.*s\" GroupCoordinator response error: %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_err2str(ErrorCode));
if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
return;
if (ErrorCode == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
rd_kafka_cgrp_coord_update(rkcg, -1);
else {
if (rkcg->rkcg_last_err != ErrorCode) {
rd_kafka_q_op_err(rkcg->rkcg_q,
RD_KAFKA_OP_CONSUMER_ERR,
ErrorCode, 0, NULL, 0,
"GroupCoordinator response error: %s",
rd_kafka_err2str(ErrorCode));
/* Suppress repeated errors */
rkcg->rkcg_last_err = ErrorCode;
}
/* Continue querying */
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
}
rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
}
/**
* Query for coordinator.
* Ask any broker in state UP
*
* Locality: main thread
*/
void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
const char *reason) {
rd_kafka_broker_t *rkb;
rd_kafka_rdlock(rkcg->rkcg_rk);
rkb = rd_kafka_broker_any(rkcg->rkcg_rk, RD_KAFKA_BROKER_STATE_UP,
rd_kafka_broker_filter_can_group_query, NULL);
rd_kafka_rdunlock(rkcg->rkcg_rk);
if (!rkb) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
"Group \"%.*s\": "
"no broker available for coordinator query: %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
return;
}
rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
"Group \"%.*s\": querying for coordinator: %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
rd_kafka_GroupCoordinatorRequest(rkb, rkcg->rkcg_group_id,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_cgrp_handle_GroupCoordinator,
rkcg);
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
rd_kafka_broker_destroy(rkb);
}
/**
* @brief Mark the current coordinator as dead.
*
* @locality main thread
*/
void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
const char *reason) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
"Group \"%.*s\": marking the coordinator dead: %s: %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_err2str(err), reason);
rd_kafka_cgrp_coord_update(rkcg, -1);
/* Re-query for coordinator */
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
rd_kafka_cgrp_coord_query(rkcg, reason);
}
static void rd_kafka_cgrp_leave (rd_kafka_cgrp_t *rkcg, int ignore_response) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
"Group \"%.*s\": leave",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
rd_kafka_LeaveGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id,
rkcg->rkcg_member_id,
ignore_response ?
RD_KAFKA_NO_REPLYQ :
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
ignore_response ? NULL :
rd_kafka_handle_LeaveGroup, rkcg);
else if (!ignore_response)
rd_kafka_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_rkb,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, rkcg);
}
/**
* Enqueue a rebalance op (if configured). 'partitions' is copied.
* This delegates the responsibility of assign() and unassign() to the
* application.
*
* Returns 1 if a rebalance op was enqueued, else 0.
* Returns 0 if there was no rebalance_cb or 'assignment' is NULL,
* in which case rd_kafka_cgrp_assign(rkcg,assignment) is called immediately.
*/
static int
rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *assignment,
const char *reason) {
rd_kafka_op_t *rko;
rd_kafka_wrlock(rkcg->rkcg_rk);
rkcg->rkcg_c.ts_rebalance = rd_clock();
rkcg->rkcg_c.rebalance_cnt++;
rd_kafka_wrunlock(rkcg->rkcg_rk);
/* Pause current partition set consumers until new assign() is called */
if (rkcg->rkcg_assignment)
rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 1,
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
rkcg->rkcg_assignment);
if (!(rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE)
|| !assignment) {
no_delegation:
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
rd_kafka_cgrp_assign(rkcg, assignment);
else
rd_kafka_cgrp_unassign(rkcg);
return 0;
}
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
"Group \"%s\": delegating %s of %d partition(s) "
"to application rebalance callback on queue %s: %s",
rkcg->rkcg_group_id->str,
err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ?
"revoke":"assign", assignment->cnt,
rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
rd_kafka_cgrp_set_join_state(
rkcg,
err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB :
RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB);
rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
rko->rko_err = err;
rko->rko_u.rebalance.partitions =
rd_kafka_topic_partition_list_copy(assignment);
if (rd_kafka_q_enq(rkcg->rkcg_q, rko) == 0) {
/* Queue disabled, handle assignment here. */
goto no_delegation;
}
return 1;
}
/**
* @brief Run group assignment.
*/
static void
rd_kafka_cgrp_assignor_run (rd_kafka_cgrp_t *rkcg,
const char *protocol_name,
rd_kafka_resp_err_t err,
rd_kafka_metadata_t *metadata,
rd_kafka_group_member_t *members,
int member_cnt) {
char errstr[512];
if (err) {
rd_snprintf(errstr, sizeof(errstr),
"Failed to get cluster metadata: %s",
rd_kafka_err2str(err));
goto err;
}
*errstr = '\0';
/* Run assignor */
err = rd_kafka_assignor_run(rkcg, protocol_name, metadata,
members, member_cnt,
errstr, sizeof(errstr));
if (err) {
if (!*errstr)
rd_snprintf(errstr, sizeof(errstr), "%s",
rd_kafka_err2str(err));
goto err;
}
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNOR",
"Group \"%s\": \"%s\" assignor run for %d member(s)",
rkcg->rkcg_group_id->str, protocol_name, member_cnt);
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
/* Respond to broker with assignment set or error */
rd_kafka_SyncGroupRequest(rkcg->rkcg_rkb,
rkcg->rkcg_group_id, rkcg->rkcg_generation_id,
rkcg->rkcg_member_id,
members, err ? 0 : member_cnt,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_handle_SyncGroup, rkcg);
return;
err:
rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
"Group \"%s\": failed to run assignor \"%s\" for "
"%d member(s): %s",
rkcg->rkcg_group_id->str, protocol_name,
member_cnt, errstr);
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
}
/**
* @brief Op callback from handle_JoinGroup
*/
static rd_kafka_op_res_t
rd_kafka_cgrp_assignor_handle_Metadata_op (rd_kafka_t *rk,
rd_kafka_q_t *rkq,
rd_kafka_op_t *rko) {
rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
if (!rkcg->rkcg_group_leader.protocol) {
rd_kafka_dbg(rk, CGRP, "GRPLEADER",
"Group \"%.*s\": no longer leader: "
"not running assignor",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
return RD_KAFKA_OP_RES_HANDLED;
}
rd_kafka_cgrp_assignor_run(rkcg,
rkcg->rkcg_group_leader.protocol,
rko->rko_err, rko->rko_u.metadata.md,
rkcg->rkcg_group_leader.members,
rkcg->rkcg_group_leader.member_cnt);
return RD_KAFKA_OP_RES_HANDLED;
}
/**
* Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
*
* Protocol definition:
* https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
*
* Returns 0 on success or -1 on error.
*/
static int
rd_kafka_group_MemberMetadata_consumer_read (
rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm,
const rd_kafkap_str_t *GroupProtocol,
const rd_kafkap_bytes_t *MemberMetadata) {
rd_kafka_buf_t *rkbuf;
int16_t Version;
int32_t subscription_cnt;
rd_kafkap_bytes_t UserData;
const int log_decode_errors = LOG_ERR;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
/* Create a shadow-buffer pointing to the metadata to ease parsing. */
rkbuf = rd_kafka_buf_new_shadow(MemberMetadata->data,
RD_KAFKAP_BYTES_LEN(MemberMetadata),
NULL);
rd_kafka_buf_read_i16(rkbuf, &Version);
rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
if (subscription_cnt > 10000 || subscription_cnt <= 0)
goto err;
rkgm->rkgm_subscription =
rd_kafka_topic_partition_list_new(subscription_cnt);
while (subscription_cnt-- > 0) {
rd_kafkap_str_t Topic;
char *topic_name;
rd_kafka_buf_read_str(rkbuf, &Topic);
RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
rd_kafka_topic_partition_list_add(rkgm->rkgm_subscription,
topic_name,
RD_KAFKA_PARTITION_UA);
}
rd_kafka_buf_read_bytes(rkbuf, &UserData);
rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
rd_kafka_buf_destroy(rkbuf);
return 0;
err_parse:
err = rkbuf->rkbuf_err;
err:
rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
"Failed to parse MemberMetadata for \"%.*s\": %s",
RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
rd_kafka_err2str(err));
if (rkgm->rkgm_subscription) {
rd_kafka_topic_partition_list_destroy(rkgm->
rkgm_subscription);
rkgm->rkgm_subscription = NULL;
}
rd_kafka_buf_destroy(rkbuf);
return -1;
}
/**
* @brief cgrp handler for JoinGroup responses
* opaque must be the cgrp handle.
*
* @locality cgrp broker thread
*/
static void rd_kafka_cgrp_handle_JoinGroup (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_cgrp_t *rkcg = opaque;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int32_t GenerationId;
rd_kafkap_str_t Protocol, LeaderId, MyMemberId;
int32_t member_cnt;
int actions;
int i_am_leader = 0;
if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
"JoinGroup response: discarding outdated request "
"(now in join-state %s)",
rd_kafka_cgrp_join_state_names[rkcg->
rkcg_join_state]);
return;
}
if (err) {
ErrorCode = err;
goto err;
}
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
rd_kafka_buf_read_i32(rkbuf, &GenerationId);
rd_kafka_buf_read_str(rkbuf, &Protocol);
rd_kafka_buf_read_str(rkbuf, &LeaderId);
rd_kafka_buf_read_str(rkbuf, &MyMemberId);
rd_kafka_buf_read_i32(rkbuf, &member_cnt);
if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
/* Protocol not set, we will not be able to find
* a matching assignor so error out early. */
ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
}
rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
"JoinGroup response: GenerationId %"PRId32", "
"Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
"%"PRId32" members in group: %s",
GenerationId,
RD_KAFKAP_STR_PR(&Protocol),
RD_KAFKAP_STR_PR(&LeaderId),
!rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "",
RD_KAFKAP_STR_PR(&MyMemberId),
member_cnt,
ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
if (!ErrorCode) {
char *my_member_id;
RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
rkcg->rkcg_generation_id = GenerationId;
rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
} else {
rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000*1000);
goto err;
}
if (i_am_leader) {
rd_kafka_group_member_t *members;
int i;
int sub_cnt = 0;
rd_list_t topics;
rd_kafka_op_t *rko;
rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
"Elected leader for group \"%s\" "
"with %"PRId32" member(s)",
rkcg->rkcg_group_id->str, member_cnt);
if (member_cnt > 100000) {
err = RD_KAFKA_RESP_ERR__BAD_MSG;
goto err;
}
rd_list_init(&topics, member_cnt, rd_free);
members = rd_calloc(member_cnt, sizeof(*members));
for (i = 0 ; i < member_cnt ; i++) {
rd_kafkap_str_t MemberId;
rd_kafkap_bytes_t MemberMetadata;
rd_kafka_group_member_t *rkgm;
rd_kafka_buf_read_str(rkbuf, &MemberId);
rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
rkgm = &members[sub_cnt];
rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
if (rd_kafka_group_MemberMetadata_consumer_read(
rkb, rkgm, &Protocol, &MemberMetadata)) {
/* Failed to parse this member's metadata,
* ignore it. */
} else {
sub_cnt++;
rkgm->rkgm_assignment =
rd_kafka_topic_partition_list_new(
rkgm->rkgm_subscription->size);
rd_kafka_topic_partition_list_get_topic_names(
rkgm->rkgm_subscription, &topics,
0/*dont include regex*/);
}
}
/* FIXME: What to do if parsing failed for some/all members?
* It is a sign of incompatibility. */
rd_kafka_cgrp_group_leader_reset(rkcg,
"JoinGroup response clean-up");
rkcg->rkcg_group_leader.protocol = RD_KAFKAP_STR_DUP(&Protocol);
rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
rkcg->rkcg_group_leader.members = members;
rkcg->rkcg_group_leader.member_cnt = sub_cnt;
rd_kafka_cgrp_set_join_state(
rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
/* The assignor will need metadata so fetch it asynchronously
* and run the assignor when we get a reply.
* Create a callback op that the generic metadata code
* will trigger when metadata has been parsed. */
rko = rd_kafka_op_new_cb(
rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
rd_kafka_cgrp_assignor_handle_Metadata_op);
rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
rd_kafka_MetadataRequest(rkb, &topics,
"partition assignor", rko);
rd_list_destroy(&topics);
} else {
rd_kafka_cgrp_set_join_state(
rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
rd_kafka_SyncGroupRequest(rkb, rkcg->rkcg_group_id,
rkcg->rkcg_generation_id,
rkcg->rkcg_member_id,
NULL, 0,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_handle_SyncGroup, rkcg);
}
err:
actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
RD_KAFKA_ERR_ACTION_IGNORE,
RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
RD_KAFKA_ERR_ACTION_END);
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Re-query for coordinator */
rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
RD_KAFKA_OP_COORD_QUERY, ErrorCode);
}
if (ErrorCode) {
if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
return; /* Termination */
if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
rd_kafka_q_op_err(rkcg->rkcg_q,
RD_KAFKA_OP_CONSUMER_ERR,
ErrorCode, 0, NULL, 0,
"JoinGroup failed: %s",
rd_kafka_err2str(ErrorCode));
if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
rd_kafka_cgrp_set_member_id(rkcg, "");
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_INIT);
}
return;
err_parse:
ErrorCode = rkbuf->rkbuf_err;
goto err;
}
/**
* @brief Check subscription against requested Metadata.
*/
static rd_kafka_op_res_t
rd_kafka_cgrp_handle_Metadata_op (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko) {
rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont rejoin*/);
return RD_KAFKA_OP_RES_HANDLED;
}
/**
* @brief (Async) Refresh metadata (for cgrp's needs)
*
* @returns 1 if metadata refresh was requested, or 0 if metadata is
* up to date, or -1 if no broker is available for metadata requests.
*
* @locks none
* @locality rdkafka main thread
*/
static int rd_kafka_cgrp_metadata_refresh (rd_kafka_cgrp_t *rkcg,
int *metadata_agep,
const char *reason) {
rd_kafka_t *rk = rkcg->rkcg_rk;
rd_kafka_op_t *rko;
rd_list_t topics;
rd_kafka_resp_err_t err;
rd_list_init(&topics, 8, rd_free);
/* Insert all non-wildcard topics in cache. */
rd_kafka_metadata_cache_hint_rktparlist(rkcg->rkcg_rk,
rkcg->rkcg_subscription,
NULL, 0/*dont replace*/);
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
/* For wildcard subscriptions make sure the
* cached full metadata isn't too old. */
int metadata_age = -1;
if (rk->rk_ts_full_metadata)
metadata_age = (int)(rd_clock() -
rk->rk_ts_full_metadata)/1000;
*metadata_agep = metadata_age;
if (metadata_age != -1 &&
metadata_age <=
/* The +1000 is since metadata.refresh.interval.ms
* can be set to 0. */
rk->rk_conf.metadata_refresh_interval_ms + 1000) {
rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
"CGRPMETADATA",
"%s: metadata for wildcard subscription "
"is up to date (%dms old)",
reason, *metadata_agep);
rd_list_destroy(&topics);
return 0; /* Up-to-date */
}
} else {
/* Check that all subscribed topics are in the cache. */
int r;
rd_kafka_topic_partition_list_get_topic_names(
rkcg->rkcg_subscription, &topics, 0/*no regexps*/);
rd_kafka_rdlock(rk);
r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
metadata_agep);
rd_kafka_rdunlock(rk);
if (r == rd_list_cnt(&topics)) {
rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
"CGRPMETADATA",
"%s: metadata for subscription "
"is up to date (%dms old)", reason,
*metadata_agep);
rd_list_destroy(&topics);
return 0; /* Up-to-date and all topics exist. */
}
rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
"CGRPMETADATA",
"%s: metadata for subscription "
"only available for %d/%d topics (%dms old)",
reason, r, rd_list_cnt(&topics), *metadata_agep);
}
/* Async request, result will be triggered from
* rd_kafka_parse_metadata(). */
rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
rd_kafka_cgrp_handle_Metadata_op);
rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
reason, rko);
if (err) {
rd_kafka_dbg(rk, CGRP|RD_KAFKA_DBG_METADATA,
"CGRPMETADATA",
"%s: need to refresh metadata (%dms old) "
"but no usable brokers available: %s",
reason, *metadata_agep, rd_kafka_err2str(err));
rd_kafka_op_destroy(rko);
}
rd_list_destroy(&topics);
return err ? -1 : 1;
}
static void rd_kafka_cgrp_join (rd_kafka_cgrp_t *rkcg) {
int metadata_age;
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT)
return;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
"Group \"%.*s\": join with %d (%d) subscribed topic(s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_list_cnt(rkcg->rkcg_subscribed_topics),
rkcg->rkcg_subscription->cnt);
/* See if we need to query metadata to continue:
* - if subscription contains wildcards:
* * query all topics in cluster
*
* - if subscription does not contain wildcards but
* some topics are missing from the local metadata cache:
* * query subscribed topics (all cached ones)
*
* - otherwise:
* * rely on topic metadata cache
*/
/* We need up-to-date full metadata to continue,
* refresh metadata if necessary. */
if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
"consumer join") == 1) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
"Group \"%.*s\": "
"postponing join until up-to-date "
"metadata is available",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
return; /* ^ async call */
}
if (rd_list_empty(rkcg->rkcg_subscribed_topics))
rd_kafka_cgrp_metadata_update_check(rkcg, 0/*dont join*/);
if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
"Group \"%.*s\": "
"no matching topics based on %dms old metadata: "
"next metadata refresh in %dms",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
metadata_age,
rkcg->rkcg_rk->rk_conf.
metadata_refresh_interval_ms - metadata_age);
return;
}
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
rd_kafka_JoinGroupRequest(rkcg->rkcg_rkb, rkcg->rkcg_group_id,
rkcg->rkcg_member_id,
rkcg->rkcg_rk->rk_conf.group_protocol_type,
rkcg->rkcg_subscribed_topics,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_cgrp_handle_JoinGroup, rkcg);
}
/**
* Rejoin group on update to effective subscribed topics list
*/
static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg) {
/*
* Clean-up group leader duties, if any.
*/
rd_kafka_cgrp_group_leader_reset(rkcg, "Group rejoin");
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "REJOIN",
"Group \"%.*s\" rejoining in join-state %s "
"with%s an assignment",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_assignment ? "" : "out");
/* Remove assignment (async), if any. If there is already an
* unassign in progress we dont need to bother. */
if (rkcg->rkcg_assignment) {
if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
rd_kafka_rebalance_op(
rkcg,
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
rkcg->rkcg_assignment, "unsubscribe");
}
} else {
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_INIT);
rd_kafka_cgrp_join(rkcg);
}
}
/**
* Update the effective list of subscribed topics and trigger a rejoin
* if it changed.
*
* Set \p tinfos to NULL for clearing the list.
*
* @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
*
* @returns 1 on change, else 0.
*
* @remark Takes ownership of \p tinfos
*/
static int
rd_kafka_cgrp_update_subscribed_topics (rd_kafka_cgrp_t *rkcg,
rd_list_t *tinfos) {
rd_kafka_topic_info_t *tinfo;
int i;
if (!tinfos) {
if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
"Group \"%.*s\": "
"clearing subscribed topics list (%d)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_list_cnt(rkcg->rkcg_subscribed_topics));
tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
} else {
if (rd_list_cnt(tinfos) == 0)
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
"Group \"%.*s\": "
"no topics in metadata matched "
"subscription",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
}
/* Sort for comparison */
rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
/* Compare to existing to see if anything changed. */
if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
rd_kafka_topic_info_cmp)) {
/* No change */
rd_list_destroy(tinfos);
return 0;
}
rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
"Group \"%.*s\": effective subscription list changed "
"from %d to %d topic(s):",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_list_cnt(rkcg->rkcg_subscribed_topics),
rd_list_cnt(tinfos));
RD_LIST_FOREACH(tinfo, tinfos, i)
rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA,
"SUBSCRIPTION",
" Topic %s with %d partition(s)",
tinfo->topic, tinfo->partition_cnt);
rd_list_destroy(rkcg->rkcg_subscribed_topics);
rkcg->rkcg_subscribed_topics = tinfos;
return 1;
}
/**
* @brief Handle heart Heartbeat response.
*/
void rd_kafka_cgrp_handle_Heartbeat (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int actions;
if (err) {
ErrorCode = err;
goto err;
}
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
err:
actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
RD_KAFKA_ERR_ACTION_END);
rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Re-query for coordinator */
rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
RD_KAFKA_OP_COORD_QUERY, ErrorCode);
/* Schedule a retry */
if (ErrorCode != RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
rd_kafka_buf_keep(request);
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
rd_kafka_broker_buf_retry(request->rkbuf_rkb, request);
}
return;
}
if (ErrorCode != 0 && ErrorCode != RD_KAFKA_RESP_ERR__DESTROY)
rd_kafka_cgrp_handle_heartbeat_error(rkcg, ErrorCode);
return;
err_parse:
ErrorCode = rkbuf->rkbuf_err;
goto err;
}
/**
* @brief Send Heartbeat
*/
static void rd_kafka_cgrp_heartbeat (rd_kafka_cgrp_t *rkcg,
rd_kafka_broker_t *rkb) {
/* Skip heartbeat if we have one in transit */
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
return;
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
rd_kafka_HeartbeatRequest(rkb, rkcg->rkcg_group_id,
rkcg->rkcg_generation_id,
rkcg->rkcg_member_id,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_cgrp_handle_Heartbeat, NULL);
}
/**
* Cgrp is now terminated: decommission it and signal back to application.
*/
static void rd_kafka_cgrp_terminated (rd_kafka_cgrp_t *rkcg) {
rd_kafka_assert(NULL, rkcg->rkcg_wait_unassign_cnt == 0);
rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt == 0);
rd_kafka_assert(NULL, !(rkcg->rkcg_flags&RD_KAFKA_CGRP_F_WAIT_UNASSIGN));
rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
&rkcg->rkcg_offset_commit_tmr, 1/*lock*/);
rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
/* Disable and empty ops queue since there will be no
* (broker) thread serving it anymore after the unassign_broker
* below.
* This prevents hang on destroy where responses are enqueued on rkcg_ops
* without anything serving the queue. */
rd_kafka_q_disable(rkcg->rkcg_ops);
rd_kafka_q_purge(rkcg->rkcg_ops);
if (rkcg->rkcg_rkb)
rd_kafka_cgrp_unassign_broker(rkcg);
if (rkcg->rkcg_reply_rko) {
/* Signal back to application. */
rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
rkcg->rkcg_reply_rko, 0);
rkcg->rkcg_reply_rko = NULL;
}
}
/**
* If a cgrp is terminating and all outstanding ops are now finished
* then progress to final termination and return 1.
* Else returns 0.
*/
static RD_INLINE int rd_kafka_cgrp_try_terminate (rd_kafka_cgrp_t *rkcg) {
if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
return 1;
if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
return 0;
/* Check if wait-coord queue has timed out. */
if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
rkcg->rkcg_ts_terminate +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
rd_clock()) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
"Group \"%s\": timing out %d op(s) in "
"wait-for-coordinator queue",
rkcg->rkcg_group_id->str,
rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
if (rd_kafka_q_concat(rkcg->rkcg_ops,
rkcg->rkcg_wait_coord_q) == -1) {
/* ops queue shut down, purge coord queue */
rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
}
}
if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
rd_list_empty(&rkcg->rkcg_toppars) &&
rkcg->rkcg_wait_unassign_cnt == 0 &&
rkcg->rkcg_wait_commit_cnt == 0 &&
!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
/* Since we might be deep down in a 'rko' handler
* called from cgrp_op_serve() we cant call terminated()
* directly since it will decommission the rkcg_ops queue
* that might be locked by intermediate functions.
* Instead set the TERM state and let the cgrp terminate
* at its own discretion. */
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
return 1;
} else {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
"Group \"%s\": "
"waiting for %s%d toppar(s), %d unassignment(s), "
"%d commit(s)%s (state %s, join-state %s) "
"before terminating",
rkcg->rkcg_group_id->str,
RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) ?
"rebalance_cb, ": "",
rd_list_cnt(&rkcg->rkcg_toppars),
rkcg->rkcg_wait_unassign_cnt,
rkcg->rkcg_wait_commit_cnt,
(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
", wait-unassign flag," : "",
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
return 0;
}
}
/**
* Add partition to this cgrp management
*/
static void rd_kafka_cgrp_partition_add (rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_t *rktp) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP,"PARTADD",
"Group \"%s\": add %s [%"PRId32"]",
rkcg->rkcg_group_id->str,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition);
rd_kafka_assert(rkcg->rkcg_rk, !rktp->rktp_s_for_cgrp);
rktp->rktp_s_for_cgrp = rd_kafka_toppar_keep(rktp);
rd_list_add(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
}
/**
* Remove partition from this cgrp management
*/
static void rd_kafka_cgrp_partition_del (rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_t *rktp) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
"Group \"%s\": delete %s [%"PRId32"]",
rkcg->rkcg_group_id->str,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition);
rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_s_for_cgrp);
rd_list_remove(&rkcg->rkcg_toppars, rktp->rktp_s_for_cgrp);
rd_kafka_toppar_destroy(rktp->rktp_s_for_cgrp);
rktp->rktp_s_for_cgrp = NULL;
rd_kafka_cgrp_try_terminate(rkcg);
}
/**
* Reply for OffsetFetch from call below.
*/
static void rd_kafka_cgrp_offsets_fetch_response (
rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_topic_partition_list_t *offsets = opaque;
rd_kafka_cgrp_t *rkcg;
if (err == RD_KAFKA_RESP_ERR__DESTROY) {
/* Termination, quick cleanup. */
rd_kafka_topic_partition_list_destroy(offsets);
return;
}
rkcg = rd_kafka_cgrp_get(rk);
if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version)) {
rd_kafka_topic_partition_list_destroy(offsets);
return;
}
rd_kafka_topic_partition_list_log(rk, "OFFSETFETCH", offsets);
/* If all partitions already had usable offsets then there
* was no request sent and thus no reply, the offsets list is
* good to go. */
if (reply)
err = rd_kafka_handle_OffsetFetch(rk, rkb, err,
reply, request, offsets,
1/* Update toppars */);
if (err) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
"Offset fetch error: %s",
rd_kafka_err2str(err));
if (err != RD_KAFKA_RESP_ERR__WAIT_COORD)
rd_kafka_q_op_err(rkcg->rkcg_q,
RD_KAFKA_OP_CONSUMER_ERR, err, 0,
NULL, 0,
"Failed to fetch offsets: %s",
rd_kafka_err2str(err));
} else {
if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
rd_kafka_cgrp_partitions_fetch_start(
rkcg, offsets, 1 /* usable offsets */);
else
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
"Group \"%.*s\": "
"ignoring Offset fetch response for "
"%d partition(s): in state %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
offsets ? offsets->cnt : -1,
rd_kafka_cgrp_join_state_names[
rkcg->rkcg_join_state]);
}
rd_kafka_topic_partition_list_destroy(offsets);
}
/**
* Fetch offsets for a list of partitions
*/
static void
rd_kafka_cgrp_offsets_fetch (rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb,
rd_kafka_topic_partition_list_t *offsets) {
rd_kafka_topic_partition_list_t *use_offsets;
/* Make a copy of the offsets */
use_offsets = rd_kafka_topic_partition_list_copy(offsets);
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkb)
rd_kafka_cgrp_offsets_fetch_response(
rkcg->rkcg_rk, rkb, RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, use_offsets);
else {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "OFFSET",
"Fetch %d offsets with v%d",
use_offsets->cnt, rkcg->rkcg_version);
rd_kafka_OffsetFetchRequest(
rkb, 1, offsets,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, rkcg->rkcg_version),
rd_kafka_cgrp_offsets_fetch_response,
use_offsets);
}
}
/**
* Start fetching all partitions in 'assignment' (async)
*/
static void
rd_kafka_cgrp_partitions_fetch_start0 (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t
*assignment, int usable_offsets,
int line) {
int i;
/* If waiting for offsets to commit we need that to finish first
* before starting fetchers (which might fetch those stored offsets).*/
if (rkcg->rkcg_wait_commit_cnt > 0) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
"Group \"%s\": not starting fetchers "
"for %d assigned partition(s) in join-state %s "
"(usable_offsets=%s, v%"PRId32", line %d): "
"waiting for %d commit(s)",
rkcg->rkcg_group_id->str, assignment->cnt,
rd_kafka_cgrp_join_state_names[rkcg->
rkcg_join_state],
usable_offsets ? "yes":"no",
rkcg->rkcg_version, line,
rkcg->rkcg_wait_commit_cnt);
return;
}
rd_kafka_cgrp_version_new_barrier(rkcg);
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "FETCHSTART",
"Group \"%s\": starting fetchers for %d assigned "
"partition(s) in join-state %s "
"(usable_offsets=%s, v%"PRId32", line %d)",
rkcg->rkcg_group_id->str, assignment->cnt,
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
usable_offsets ? "yes":"no",
rkcg->rkcg_version, line);
rd_kafka_topic_partition_list_log(rkcg->rkcg_rk,
"FETCHSTART", assignment);
if (assignment->cnt == 0)
return;
/* Check if offsets are really unusable, this is to catch the
* case where the entire assignment has absolute offsets set which
* should make us skip offset lookups. */
if (!usable_offsets)
usable_offsets =
rd_kafka_topic_partition_list_count_abs_offsets(
assignment) == assignment->cnt;
if (!usable_offsets &&
rkcg->rkcg_rk->rk_conf.offset_store_method ==
RD_KAFKA_OFFSET_METHOD_BROKER) {
/* Fetch offsets for all assigned partitions */
rd_kafka_cgrp_offsets_fetch(rkcg, rkcg->rkcg_rkb, assignment);
} else {
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_STARTED);
for (i = 0 ; i < assignment->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar =
&assignment->elems[i];
shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
if (!rktp->rktp_assigned) {
rktp->rktp_assigned = 1;
rkcg->rkcg_assigned_cnt++;
/* Start fetcher for partition and
* forward partition's fetchq to
* consumer groups queue. */
rd_kafka_toppar_op_fetch_start(
rktp, rktpar->offset,
rkcg->rkcg_q, RD_KAFKA_NO_REPLYQ);
} else {
int64_t offset;
/* Fetcher already started,
* just do seek to update offset */
rd_kafka_toppar_lock(rktp);
if (rktpar->offset < rktp->rktp_app_offset)
offset = rktp->rktp_app_offset;
else
offset = rktpar->offset;
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_op_seek(rktp, offset,
RD_KAFKA_NO_REPLYQ);
}
}
}
rd_kafka_assert(NULL, rkcg->rkcg_assigned_cnt <=
(rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0));
}
/**
* @brief Defer offset commit (rko) until coordinator is available.
*
* @returns 1 if the rko was deferred or 0 if the defer queue is disabled
* or rko already deferred.
*/
static int rd_kafka_cgrp_defer_offset_commit (rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
const char *reason) {
/* wait_coord_q is disabled session.timeout.ms after
* group close() has been initated. */
if (rko->rko_u.offset_commit.ts_timeout != 0 ||
!rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
return 0;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
"Group \"%s\": "
"unable to OffsetCommit in state %s: %s: "
"coordinator (%s) is unavailable: "
"retrying later",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
reason,
rkcg->rkcg_rkb ?
rd_kafka_broker_name(rkcg->rkcg_rkb) :
"none");
rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
rko->rko_u.offset_commit.ts_timeout = rd_clock() +
(rkcg->rkcg_rk->rk_conf.group_session_timeout_ms
* 1000);
rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
return 1;
}
/**
* @brief Handler of OffsetCommit response (after parsing).
* @remark \p offsets may be NULL if \p err is set
* @returns the number of partitions with errors encountered
*/
static int
rd_kafka_cgrp_handle_OffsetCommit (rd_kafka_cgrp_t *rkcg,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t
*offsets) {
int i;
int errcnt = 0;
if (!err) {
/* Update toppars' committed offset */
for (i = 0 ; i < offsets->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar =&offsets->elems[i];
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
if (unlikely(rktpar->err)) {
rd_kafka_dbg(rkcg->rkcg_rk, TOPIC,
"OFFSET",
"OffsetCommit failed for "
"%s [%"PRId32"] at offset "
"%"PRId64": %s",
rktpar->topic, rktpar->partition,
rktpar->offset,
rd_kafka_err2str(rktpar->err));
errcnt++;
continue;
} else if (unlikely(rktpar->offset < 0))
continue;
s_rktp = rd_kafka_topic_partition_list_get_toppar(
rkcg->rkcg_rk, rktpar);
if (!s_rktp)
continue;
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
rktp->rktp_committed_offset = rktpar->offset;
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
}
if (rd_kafka_cgrp_try_terminate(rkcg))
return errcnt; /* terminated */
if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
rd_kafka_cgrp_check_unassign_done(rkcg,
"OffsetCommit done");
return errcnt;
}
/**
* Handle OffsetCommitResponse
* Takes the original 'rko' as opaque argument.
* @remark \p rkb, rkbuf, and request may be NULL in a number of
* error cases (e.g., _NO_OFFSET, _WAIT_COORD)
*/
static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
rd_kafka_op_t *rko_orig = opaque;
rd_kafka_topic_partition_list_t *offsets =
rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
int errcnt;
int offset_commit_cb_served = 0;
RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
if (rd_kafka_buf_version_outdated(request, rkcg->rkcg_version))
err = RD_KAFKA_RESP_ERR__DESTROY;
err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf,
request, offsets);
if (rkb)
rd_rkb_dbg(rkb, CGRP, "COMMIT",
"OffsetCommit for %d partition(s): %s: returned: %s",
offsets ? offsets->cnt : -1,
rko_orig->rko_u.offset_commit.reason,
rd_kafka_err2str(err));
else
rd_kafka_dbg(rk, CGRP, "COMMIT",
"OffsetCommit for %d partition(s): %s: returned: %s",
offsets ? offsets->cnt : -1,
rko_orig->rko_u.offset_commit.reason,
rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
return; /* Retrying */
else if (err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP ||
err == RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
/* future-proofing, see timeout_scan(). */
rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
rd_kafka_err2str(err)))
return;
/* FALLTHRU and error out */
}
rd_kafka_assert(NULL, rkcg->rkcg_wait_commit_cnt > 0);
rkcg->rkcg_wait_commit_cnt--;
if (err == RD_KAFKA_RESP_ERR__DESTROY ||
(err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
rko_orig->rko_u.offset_commit.silent_empty)) {
rd_kafka_op_destroy(rko_orig);
rd_kafka_cgrp_check_unassign_done(
rkcg,
err == RD_KAFKA_RESP_ERR__DESTROY ?
"OffsetCommit done (__DESTROY)" :
"OffsetCommit done (__NO_OFFSET)");
return;
}
/* Call on_commit interceptors */
if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
err != RD_KAFKA_RESP_ERR__DESTROY &&
offsets && offsets->cnt > 0)
rd_kafka_interceptors_on_commit(rk, offsets, err);
/* If no special callback is set but a offset_commit_cb has
* been set in conf then post an event for the latter. */
if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
if (offsets)
rko_reply->rko_u.offset_commit.partitions =
rd_kafka_topic_partition_list_copy(offsets);
rko_reply->rko_u.offset_commit.cb =
rk->rk_conf.offset_commit_cb;
rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
rd_kafka_q_enq(rk->rk_rep, rko_reply);
offset_commit_cb_served++;
}
/* Enqueue reply to requester's queue, if any. */
if (rko_orig->rko_replyq.q) {
rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
/* Copy offset & partitions & callbacks to reply op */
rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
if (offsets)
rko_reply->rko_u.offset_commit.partitions =
rd_kafka_topic_partition_list_copy(offsets);
if (rko_reply->rko_u.offset_commit.reason)
rko_reply->rko_u.offset_commit.reason =
rd_strdup(rko_reply->rko_u.offset_commit.reason);
rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
offset_commit_cb_served++;
}
errcnt = rd_kafka_cgrp_handle_OffsetCommit(rkcg, err, offsets);
if (!offset_commit_cb_served &&
err != RD_KAFKA_RESP_ERR_NO_ERROR &&
err != RD_KAFKA_RESP_ERR__NO_OFFSET) {
/* If there is no callback or handler for this (auto)
* commit then raise an error to the application (#1043) */
char tmp[512];
rd_kafka_topic_partition_list_str(
offsets, tmp, sizeof(tmp),
/*no partition-errs if a global error*/
RD_KAFKA_FMT_F_OFFSET |
(err ? 0 : RD_KAFKA_FMT_F_ONLY_ERR));
rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
"Offset commit (%s) failed "
"for %d/%d partition(s): "
"%s%s%s",
rko_orig->rko_u.offset_commit.reason,
err ? offsets->cnt : errcnt, offsets->cnt,
err ? rd_kafka_err2str(err) : "",
err ? ": " : "",
tmp);
}
rd_kafka_op_destroy(rko_orig);
}
static size_t rd_kafka_topic_partition_has_absolute_offset (
const rd_kafka_topic_partition_t *rktpar, void *opaque) {
return rktpar->offset >= 0 ? 1 : 0;
}
/**
* Commit a list of offsets.
* Reuse the orignating 'rko' for the async reply.
* 'rko->rko_payload' should either by NULL (to commit current assignment) or
* a proper topic_partition_list_t with offsets to commit.
* The offset list will be altered.
*
* \p rko...silent_empty: if there are no offsets to commit bail out
* silently without posting an op on the reply queue.
* \p set_offsets: set offsets in rko->rko_u.offset_commit.partitions
*
* \p op_version: cgrp's op version to use (or 0)
*
* Locality: cgrp thread
*/
static void rd_kafka_cgrp_offsets_commit (rd_kafka_cgrp_t *rkcg,
rd_kafka_op_t *rko,
int set_offsets,
const char *reason,
int op_version) {
rd_kafka_topic_partition_list_t *offsets;
rd_kafka_resp_err_t err;
int valid_offsets = 0;
/* If offsets is NULL we shall use the current assignment. */
if (!rko->rko_u.offset_commit.partitions && rkcg->rkcg_assignment)
rko->rko_u.offset_commit.partitions =
rd_kafka_topic_partition_list_copy(
rkcg->rkcg_assignment);
offsets = rko->rko_u.offset_commit.partitions;
if (offsets) {
/* Set offsets to commits */
if (set_offsets)
rd_kafka_topic_partition_list_set_offsets(
rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1,
RD_KAFKA_OFFSET_INVALID/* def */,
1 /* is commit */);
/* Check the number of valid offsets to commit. */
valid_offsets = (int)rd_kafka_topic_partition_list_sum(
offsets,
rd_kafka_topic_partition_has_absolute_offset, NULL);
}
if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
/* wait_commit_cnt has already been increased for
* reprocessed ops. */
rkcg->rkcg_wait_commit_cnt++;
}
if (!valid_offsets) {
/* No valid offsets */
err = RD_KAFKA_RESP_ERR__NO_OFFSET;
goto err;
}
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_rkb ||
rkcg->rkcg_rkb->rkb_source == RD_KAFKA_INTERNAL) {
if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
return;
err = RD_KAFKA_RESP_ERR__WAIT_COORD;
} else {
int r;
/* Send OffsetCommit */
r = rd_kafka_OffsetCommitRequest(
rkcg->rkcg_rkb, rkcg, 1, offsets,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, op_version),
rd_kafka_cgrp_op_handle_OffsetCommit, rko,
reason);
/* Must have valid offsets to commit if we get here */
rd_kafka_assert(NULL, r != 0);
return;
}
err:
/* Propagate error to whoever wanted offset committed. */
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
"OffsetCommit internal error: %s", rd_kafka_err2str(err));
rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, err,
NULL, NULL, rko);
}
/**
* Commit offsets for all assigned partitions.
*/
static void
rd_kafka_cgrp_assigned_offsets_commit (rd_kafka_cgrp_t *rkcg,
const rd_kafka_topic_partition_list_t
*offsets, const char *reason) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
rko->rko_u.offset_commit.reason = rd_strdup(reason);
if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT) {
rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
rko->rko_u.offset_commit.cb =
rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
}
/* NULL partitions means current assignment */
if (offsets)
rko->rko_u.offset_commit.partitions =
rd_kafka_topic_partition_list_copy(offsets);
rko->rko_u.offset_commit.silent_empty = 1;
rd_kafka_cgrp_offsets_commit(rkcg, rko, 1/* set offsets */, reason,
rkcg->rkcg_version);
}
/**
* auto.commit.interval.ms commit timer callback.
*
* Trigger a group offset commit.
*
* Locality: rdkafka main thread
*/
static void rd_kafka_cgrp_offset_commit_tmr_cb (rd_kafka_timers_t *rkts,
void *arg) {
rd_kafka_cgrp_t *rkcg = arg;
rd_kafka_cgrp_assigned_offsets_commit(rkcg, NULL,
"cgrp auto commit timer");
}
/**
* Call when all unassign operations are done to transition to the next state
*/
static void rd_kafka_cgrp_unassign_done (rd_kafka_cgrp_t *rkcg,
const char *reason) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
"Group \"%s\": unassign done in state %s (join state %s): "
"%s: %s",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_assignment ?
"with new assignment" : "without new assignment",
reason);
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN) {
rd_kafka_cgrp_leave(rkcg, 1/*ignore response*/);
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
}
if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN) {
rd_kafka_cgrp_try_terminate(rkcg);
return;
}
if (rkcg->rkcg_assignment) {
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
rd_kafka_cgrp_partitions_fetch_start(
rkcg, rkcg->rkcg_assignment, 0);
} else {
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_INIT);
}
rd_kafka_cgrp_try_terminate(rkcg);
}
/**
* Checks if the current unassignment is done and if so
* calls .._done().
* Else does nothing.
*/
static void rd_kafka_cgrp_check_unassign_done (rd_kafka_cgrp_t *rkcg,
const char *reason) {
if (rkcg->rkcg_wait_unassign_cnt > 0 ||
rkcg->rkcg_assigned_cnt > 0 ||
rkcg->rkcg_wait_commit_cnt > 0 ||
rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
"Unassign not done yet "
"(%d wait_unassign, %d assigned, %d wait commit"
"%s): %s",
rkcg->rkcg_wait_unassign_cnt,
rkcg->rkcg_assigned_cnt,
rkcg->rkcg_wait_commit_cnt,
(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)?
", F_WAIT_UNASSIGN" : "", reason);
return;
}
rd_kafka_cgrp_unassign_done(rkcg, reason);
}
/**
* Remove existing assignment.
*/
static rd_kafka_resp_err_t
rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
int i;
rd_kafka_topic_partition_list_t *old_assignment;
rd_kafka_cgrp_set_join_state(rkcg,
RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN);
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
old_assignment = rkcg->rkcg_assignment;
if (!old_assignment) {
rd_kafka_cgrp_check_unassign_done(
rkcg, "unassign (no previous assignment)");
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
rkcg->rkcg_assignment = NULL;
rd_kafka_cgrp_version_new_barrier(rkcg);
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
"Group \"%s\": unassigning %d partition(s) (v%"PRId32")",
rkcg->rkcg_group_id->str, old_assignment->cnt,
rkcg->rkcg_version);
if (rkcg->rkcg_rk->rk_conf.offset_store_method ==
RD_KAFKA_OFFSET_METHOD_BROKER &&
rkcg->rkcg_rk->rk_conf.enable_auto_commit) {
/* Commit all offsets for all assigned partitions to broker */
rd_kafka_cgrp_assigned_offsets_commit(rkcg, old_assignment,
"unassign");
}
for (i = 0 ; i < old_assignment->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rktpar = &old_assignment->elems[i];
s_rktp = rktpar->_private;
rktp = rd_kafka_toppar_s2i(s_rktp);
if (rktp->rktp_assigned) {
rd_kafka_toppar_op_fetch_stop(
rktp, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0));
rkcg->rkcg_wait_unassign_cnt++;
}
rd_kafka_toppar_lock(rktp);
rd_kafka_toppar_desired_del(rktp);
rd_kafka_toppar_unlock(rktp);
}
/* Resume partition consumption. */
rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 0/*resume*/,
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
old_assignment);
rd_kafka_topic_partition_list_destroy(old_assignment);
rd_kafka_cgrp_check_unassign_done(rkcg, "unassign");
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Set new atomic partition assignment
* May update \p assignment but will not hold on to it.
*/
static void
rd_kafka_cgrp_assign (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *assignment) {
int i;
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
"Group \"%s\": new assignment of %d partition(s) "
"in join state %s",
rkcg->rkcg_group_id->str,
assignment ? assignment->cnt : 0,
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
/* Get toppar object for each partition.
* This is to make sure the rktp stays alive during unassign(). */
for (i = 0 ; assignment && i < assignment->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar;
shptr_rd_kafka_toppar_t *s_rktp;
rktpar = &assignment->elems[i];
/* Use existing toppar if set */
if (rktpar->_private)
continue;
s_rktp = rd_kafka_toppar_get2(rkcg->rkcg_rk,
rktpar->topic,
rktpar->partition,
0/*no-ua*/, 1/*create-on-miss*/);
if (s_rktp)
rktpar->_private = s_rktp;
}
rd_kafka_cgrp_version_new_barrier(rkcg);
rd_kafka_wrlock(rkcg->rkcg_rk);
rkcg->rkcg_c.assignment_size = assignment ? assignment->cnt : 0;
rd_kafka_wrunlock(rkcg->rkcg_rk);
/* Remove existing assignment (async operation) */
if (rkcg->rkcg_assignment)
rd_kafka_cgrp_unassign(rkcg);
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
"Group \"%s\": assigning %d partition(s) in join state %s",
rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0,
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
if (assignment) {
rkcg->rkcg_assignment =
rd_kafka_topic_partition_list_copy(assignment);
/* Mark partition(s) as desired */
for (i = 0 ; i < rkcg->rkcg_assignment->cnt ; i++) {
rd_kafka_topic_partition_t *rktpar =
&rkcg->rkcg_assignment->elems[i];
shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
rd_kafka_toppar_t *rktp =
rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
rd_kafka_toppar_desired_add0(rktp);
rd_kafka_toppar_unlock(rktp);
}
}
if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
return;
rd_dassert(rkcg->rkcg_wait_unassign_cnt == 0);
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED);
if (RD_KAFKA_CGRP_CAN_FETCH_START(rkcg) && rkcg->rkcg_assignment) {
/* No existing assignment that needs to be decommissioned,
* start partition fetchers right away */
rd_kafka_cgrp_partitions_fetch_start(
rkcg, rkcg->rkcg_assignment, 0);
}
}
/**
* Handle a rebalance-triggered partition assignment.
*
* If a rebalance_cb has been registered we enqueue an op for the app
* and let the app perform the actual assign() call.
* Otherwise we assign() directly from here.
*
* This provides the most flexibility, allowing the app to perform any
* operation it seem fit (e.g., offset writes or reads) before actually
* updating the assign():ment.
*/
static void
rd_kafka_cgrp_handle_assignment (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *assignment) {
rd_kafka_rebalance_op(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
assignment, "new assignment");
}
/**
* Handle HeartbeatResponse errors.
*
* If an IllegalGeneration error code is returned in the
* HeartbeatResponse, it indicates that the co-ordinator has
* initiated a rebalance. The consumer then stops fetching data,
* commits offsets and sends a JoinGroupRequest to it's co-ordinator
* broker */
void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
rd_kafka_resp_err_t err) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Group \"%s\" heartbeat error response in "
"state %s (join state %s, %d partition(s) assigned): %s",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_assignment ? rkcg->rkcg_assignment->cnt : 0,
rd_kafka_err2str(err));
if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
"Heartbeat response: discarding outdated "
"request (now in join-state %s)",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
return;
}
switch (err)
{
case RD_KAFKA_RESP_ERR__DESTROY:
/* quick cleanup */
break;
case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR__TRANSPORT:
/* Remain in joined state and keep querying for coordinator */
rd_interval_expedite(&rkcg->rkcg_coord_query_intvl, 0);
break;
case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
rd_kafka_cgrp_set_member_id(rkcg, "");
case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
default:
/* Just revert to INIT state if join state is active. */
if (rkcg->rkcg_join_state <
RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB ||
rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB)
break;
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
/* Trigger rebalance_cb */
rd_kafka_rebalance_op(
rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
rkcg->rkcg_assignment, rd_kafka_err2str(err));
}
break;
}
}
/**
* Clean up any group-leader related resources.
*
* Locality: cgrp thread
*/
static void rd_kafka_cgrp_group_leader_reset (rd_kafka_cgrp_t *rkcg,
const char *reason) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER",
"Group \"%.*s\": resetting group leader info: %s",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
if (rkcg->rkcg_group_leader.protocol) {
rd_free(rkcg->rkcg_group_leader.protocol);
rkcg->rkcg_group_leader.protocol = NULL;
}
if (rkcg->rkcg_group_leader.members) {
int i;
for (i = 0 ; i < rkcg->rkcg_group_leader.member_cnt ; i++)
rd_kafka_group_member_clear(&rkcg->rkcg_group_leader.
members[i]);
rkcg->rkcg_group_leader.member_cnt = 0;
rd_free(rkcg->rkcg_group_leader.members);
rkcg->rkcg_group_leader.members = NULL;
}
}
/**
* Remove existing topic subscription.
*/
static rd_kafka_resp_err_t
rd_kafka_cgrp_unsubscribe (rd_kafka_cgrp_t *rkcg, int leave_group) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
"Group \"%.*s\": unsubscribe from current %ssubscription "
"of %d topics (leave group=%s, join state %s, v%"PRId32")",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rkcg->rkcg_subscription ? "" : "unset ",
rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0,
leave_group ? "yes":"no",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_version);
if (rkcg->rkcg_subscription) {
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rkcg->rkcg_subscription = NULL;
}
rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
/*
* Clean-up group leader duties, if any.
*/
rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");
if (leave_group)
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN;
/* Remove assignment (async), if any. If there is already an
* unassign in progress we dont need to bother. */
if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN)) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_UNASSIGN;
rd_kafka_rebalance_op(rkcg,
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
rkcg->rkcg_assignment, "unsubscribe");
}
rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Set new atomic topic subscription.
*/
static rd_kafka_resp_err_t
rd_kafka_cgrp_subscribe (rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
"Group \"%.*s\": subscribe to new %ssubscription "
"of %d topics (join state %s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rktparlist ? "" : "unset ",
rktparlist ? rktparlist->cnt : 0,
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0)
return RD_KAFKA_RESP_ERR__INVALID_ARG;
/* Remove existing subscription first */
rd_kafka_cgrp_unsubscribe(rkcg, 0/* dont leave group */);
if (!rktparlist)
return RD_KAFKA_RESP_ERR_NO_ERROR;
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
rkcg->rkcg_subscription = rktparlist;
rd_kafka_cgrp_join(rkcg);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* Same as cgrp_terminate() but called from the cgrp/main thread upon receiving
* the op 'rko' from cgrp_terminate().
*
* NOTE: Takes ownership of 'rko'
*
* Locality: main thread
*/
void
rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) {
rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
"Terminating group \"%.*s\" in state %s "
"with %d partition(s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_list_cnt(&rkcg->rkcg_toppars));
if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM ||
(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) ||
rkcg->rkcg_reply_rko != NULL)) {
/* Already terminating or handling a previous terminate */
if (rko) {
rd_kafka_q_t *rkq = rko->rko_replyq.q;
rko->rko_replyq.q = NULL;
rd_kafka_q_op_err(rkq, RD_KAFKA_OP_CONSUMER_ERR,
RD_KAFKA_RESP_ERR__IN_PROGRESS,
rko->rko_replyq.version,
NULL, 0,
"Group is %s",
rkcg->rkcg_reply_rko ?
"terminating":"terminated");
rd_kafka_q_destroy(rkq);
rd_kafka_op_destroy(rko);
}
return;
}
/* Mark for stopping, the actual state transition
* is performed when all toppars have left. */
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE;
rkcg->rkcg_ts_terminate = rd_clock();
rkcg->rkcg_reply_rko = rko;
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION)
rd_kafka_cgrp_unsubscribe(rkcg, 1/*leave group*/);
/* If there's an oustanding rebalance_cb which has not yet been
* served by the application it will be served from consumer_close(). */
if (!RD_KAFKA_CGRP_WAIT_REBALANCE_CB(rkcg) &&
!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_UNASSIGN))
rd_kafka_cgrp_unassign(rkcg);
/* Try to terminate right away if all preconditions are met. */
rd_kafka_cgrp_try_terminate(rkcg);
}
/**
* Terminate and decommission a cgrp asynchronously.
*
* Locality: any thread
*/
void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) {
rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread));
rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0);
}
struct _op_timeout_offset_commit {
rd_ts_t now;
rd_kafka_t *rk;
rd_list_t expired;
};
/**
* q_filter callback for expiring OFFSET_COMMIT timeouts.
*/
static int rd_kafka_op_offset_commit_timeout_check (rd_kafka_q_t *rkq,
rd_kafka_op_t *rko,
void *opaque) {
struct _op_timeout_offset_commit *state =
(struct _op_timeout_offset_commit*)opaque;
if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT ||
rko->rko_u.offset_commit.ts_timeout == 0 ||
rko->rko_u.offset_commit.ts_timeout > state->now)) {
return 0;
}
rd_kafka_q_deq0(rkq, rko);
/* Add to temporary list to avoid recursive
* locking of rkcg_wait_coord_q. */
rd_list_add(&state->expired, rko);
return 1;
}
/**
* Scan for various timeouts.
*/
static void rd_kafka_cgrp_timeout_scan (rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
struct _op_timeout_offset_commit ofc_state;
int i, cnt = 0;
rd_kafka_op_t *rko;
ofc_state.now = now;
ofc_state.rk = rkcg->rkcg_rk;
rd_list_init(&ofc_state.expired, 0, NULL);
cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q,
rd_kafka_op_offset_commit_timeout_check,
&ofc_state);
RD_LIST_FOREACH(rko, &ofc_state.expired, i)
rd_kafka_cgrp_op_handle_OffsetCommit(
rkcg->rkcg_rk, NULL,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, rko);
rd_list_destroy(&ofc_state.expired);
if (cnt > 0)
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT",
"Group \"%.*s\": timed out %d op(s), %d remain",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt,
rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
}
/**
* @brief Handle cgrp queue op.
* @locality rdkafka main thread
* @locks none
*/
static rd_kafka_op_res_t
rd_kafka_cgrp_op_serve (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type,
void *opaque) {
rd_kafka_cgrp_t *rkcg = opaque;
rd_kafka_broker_t *rkb = rkcg->rkcg_rkb;
rd_kafka_toppar_t *rktp;
rd_kafka_resp_err_t err;
const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF;
if (rko->rko_version && rkcg->rkcg_version > rko->rko_version) {
rd_kafka_op_destroy(rko); /* outdated */
return RD_KAFKA_OP_RES_HANDLED;
}
rktp = rko->rko_rktp ? rd_kafka_toppar_s2i(rko->rko_rktp) : NULL;
if (rktp && !silent_op)
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
"Group \"%.*s\" received op %s in state %s "
"(join state %s, v%"PRId32") "
"for %.*s [%"PRId32"]",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_op2str(rko->rko_type),
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->
rkcg_join_state],
rkcg->rkcg_version,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);
else if (!silent_op)
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPOP",
"Group \"%.*s\" received op %s (v%d) in state %s "
"(join state %s, v%"PRId32" vs %"PRId32")",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rd_kafka_op2str(rko->rko_type),
rko->rko_version,
rd_kafka_cgrp_state_names[rkcg->rkcg_state],
rd_kafka_cgrp_join_state_names[rkcg->
rkcg_join_state],
rkcg->rkcg_version, rko->rko_version);
switch ((int)rko->rko_type)
{
case RD_KAFKA_OP_NAME:
/* Return the currently assigned member id. */
if (rkcg->rkcg_member_id)
rko->rko_u.name.str =
RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id);
rd_kafka_op_reply(rko, 0);
rko = NULL;
break;
case RD_KAFKA_OP_OFFSET_FETCH:
if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
rd_kafka_op_handle_OffsetFetch(
rkcg->rkcg_rk, rkb,
RD_KAFKA_RESP_ERR__WAIT_COORD,
NULL, NULL, rko);
rko = NULL; /* rko freed by handler */
break;
}
rd_kafka_OffsetFetchRequest(
rkb, 1,
rko->rko_u.offset_fetch.partitions,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops,
rkcg->rkcg_version),
rd_kafka_op_handle_OffsetFetch, rko);
rko = NULL; /* rko now owned by request */
break;
case RD_KAFKA_OP_PARTITION_JOIN:
rd_kafka_cgrp_partition_add(rkcg, rktp);
/* If terminating tell the partition to leave */
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
rd_kafka_toppar_op_fetch_stop(
rktp, RD_KAFKA_NO_REPLYQ);
break;
case RD_KAFKA_OP_PARTITION_LEAVE:
rd_kafka_cgrp_partition_del(rkcg, rktp);
break;
case RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY:
/* Reply from toppar FETCH_STOP */
rd_kafka_assert(rkcg->rkcg_rk,
rkcg->rkcg_wait_unassign_cnt > 0);
rkcg->rkcg_wait_unassign_cnt--;
rd_kafka_assert(rkcg->rkcg_rk, rktp->rktp_assigned);
rd_kafka_assert(rkcg->rkcg_rk,
rkcg->rkcg_assigned_cnt > 0);
rktp->rktp_assigned = 0;
rkcg->rkcg_assigned_cnt--;
/* All unassigned toppars now stopped and commit done:
* transition to the next state. */
if (rkcg->rkcg_join_state ==
RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN)
rd_kafka_cgrp_check_unassign_done(rkcg,
"FETCH_STOP done");
break;
case RD_KAFKA_OP_OFFSET_COMMIT:
/* Trigger offsets commit. */
rd_kafka_cgrp_offsets_commit(rkcg, rko,
/* only set offsets
* if no partitions were
* specified. */
rko->rko_u.offset_commit.
partitions ? 0 : 1,
rko->rko_u.offset_commit.reason,
0);
rko = NULL; /* rko now owned by request */
break;
case RD_KAFKA_OP_COORD_QUERY:
rd_kafka_cgrp_coord_query(rkcg,
rko->rko_err ?
rd_kafka_err2str(rko->
rko_err):
"from op");
break;
case RD_KAFKA_OP_SUBSCRIBE:
/* New atomic subscription (may be NULL) */
err = rd_kafka_cgrp_subscribe(
rkcg, rko->rko_u.subscribe.topics);
if (!err)
rko->rko_u.subscribe.topics = NULL; /* owned by rkcg */
rd_kafka_op_reply(rko, err);
rko = NULL;
break;
case RD_KAFKA_OP_ASSIGN:
/* New atomic assignment (payload != NULL),
* or unassignment (payload == NULL) */
err = 0;
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
/* Treat all assignments as unassign
* when terminating. */
rd_kafka_cgrp_unassign(rkcg);
if (rko->rko_u.assign.partitions)
err = RD_KAFKA_RESP_ERR__DESTROY;
} else {
rd_kafka_cgrp_assign(
rkcg, rko->rko_u.assign.partitions);
}
rd_kafka_op_reply(rko, err);
rko = NULL;
break;
case RD_KAFKA_OP_GET_SUBSCRIPTION:
if (rkcg->rkcg_subscription)
rko->rko_u.subscribe.topics =
rd_kafka_topic_partition_list_copy(
rkcg->rkcg_subscription);
rd_kafka_op_reply(rko, 0);
rko = NULL;
break;
case RD_KAFKA_OP_GET_ASSIGNMENT:
if (rkcg->rkcg_assignment)
rko->rko_u.assign.partitions =
rd_kafka_topic_partition_list_copy(
rkcg->rkcg_assignment);
rd_kafka_op_reply(rko, 0);
rko = NULL;
break;
case RD_KAFKA_OP_TERMINATE:
rd_kafka_cgrp_terminate0(rkcg, rko);
rko = NULL; /* terminate0() takes ownership */
break;
default:
rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type");
break;
}
if (rko)
rd_kafka_op_destroy(rko);
return RD_KAFKA_OP_RES_HANDLED;
}
/**
* Client group's join state handling
*/
static void rd_kafka_cgrp_join_state_serve (rd_kafka_cgrp_t *rkcg,
rd_kafka_broker_t *rkb) {
if (0) // FIXME
rd_rkb_dbg(rkb, CGRP, "JOINFSM",
"Group \"%s\" in join state %s with%s subscription",
rkcg->rkcg_group_id->str,
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
rkcg->rkcg_subscription ? "" : "out");
switch (rkcg->rkcg_join_state)
{
case RD_KAFKA_CGRP_JOIN_STATE_INIT:
/* If we have a subscription start the join process. */
if (!rkcg->rkcg_subscription)
break;
if (rd_interval_immediate(&rkcg->rkcg_join_intvl,
1000*1000, 0) > 0)
rd_kafka_cgrp_join(rkcg);
break;
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN:
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB:
break;
case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB:
case RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED:
case RD_KAFKA_CGRP_JOIN_STATE_STARTED:
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
rd_interval(&rkcg->rkcg_heartbeat_intvl,
rkcg->rkcg_rk->rk_conf.
group_heartbeat_intvl_ms * 1000, 0) > 0)
rd_kafka_cgrp_heartbeat(rkcg, rkb);
break;
}
}
/**
* Client group handling.
* Called from main thread to serve the operational aspects of a cgrp.
*/
void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg) {
rd_kafka_broker_t *rkb = rkcg->rkcg_rkb;
int rkb_state = RD_KAFKA_BROKER_STATE_INIT;
rd_ts_t now;
if (rkb) {
rd_kafka_broker_lock(rkb);
rkb_state = rkb->rkb_state;
rd_kafka_broker_unlock(rkb);
/* Go back to querying state if we lost the current coordinator
* connection. */
if (rkb_state < RD_KAFKA_BROKER_STATE_UP &&
rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
rd_kafka_cgrp_set_state(rkcg,
RD_KAFKA_CGRP_STATE_QUERY_COORD);
}
now = rd_clock();
/* Check for cgrp termination */
if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) {
rd_kafka_cgrp_terminated(rkcg);
return; /* cgrp terminated */
}
/* Bail out if we're terminating. */
if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
return;
retry:
switch (rkcg->rkcg_state)
{
case RD_KAFKA_CGRP_STATE_TERM:
break;
case RD_KAFKA_CGRP_STATE_INIT:
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
/* FALLTHRU */
case RD_KAFKA_CGRP_STATE_QUERY_COORD:
/* Query for coordinator. */
if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl,
500*1000, now) > 0)
rd_kafka_cgrp_coord_query(rkcg,
"intervaled in "
"state query-coord");
break;
case RD_KAFKA_CGRP_STATE_WAIT_COORD:
/* Waiting for GroupCoordinator response */
break;
case RD_KAFKA_CGRP_STATE_WAIT_BROKER:
/* See if the group should be reassigned to another broker. */
if (rd_kafka_cgrp_reassign_broker(rkcg))
goto retry; /* broker reassigned, retry state-machine
* to speed up next transition. */
/* Coordinator query */
if (rd_interval(&rkcg->rkcg_coord_query_intvl,
1000*1000, now) > 0)
rd_kafka_cgrp_coord_query(rkcg,
"intervaled in "
"state wait-broker");
break;
case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT:
/* Waiting for broker transport to come up.
* Also make sure broker supports groups. */
if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb ||
!rd_kafka_broker_supports(
rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) {
/* Coordinator query */
if (rd_interval(&rkcg->rkcg_coord_query_intvl,
1000*1000, now) > 0)
rd_kafka_cgrp_coord_query(
rkcg,
"intervaled in state "
"wait-broker-transport");
} else {
rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP);
/* Serve join state to trigger (re)join */
rd_kafka_cgrp_join_state_serve(rkcg, rkb);
/* Start fetching if we have an assignment. */
if (rkcg->rkcg_assignment &&
RD_KAFKA_CGRP_CAN_FETCH_START(rkcg))
rd_kafka_cgrp_partitions_fetch_start(
rkcg, rkcg->rkcg_assignment, 0);
}
break;
case RD_KAFKA_CGRP_STATE_UP:
/* Move any ops awaiting the coordinator to the ops queue
* for reprocessing. */
rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q);
/* Relaxed coordinator queries. */
if (rd_interval(&rkcg->rkcg_coord_query_intvl,
rkcg->rkcg_rk->rk_conf.
coord_query_intvl_ms * 1000, now) > 0)
rd_kafka_cgrp_coord_query(rkcg,
"intervaled in state up");
rd_kafka_cgrp_join_state_serve(rkcg, rkb);
break;
}
if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP &&
rd_interval(&rkcg->rkcg_timeout_scan_intvl,
1000*1000, now) > 0))
rd_kafka_cgrp_timeout_scan(rkcg, now);
}
/**
* Send an op to a cgrp.
*
* Locality: any thread
*/
void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
rd_kafka_resp_err_t err) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(type);
rko->rko_err = err;
rko->rko_replyq = replyq;
if (rktp)
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_q_enq(rkcg->rkcg_ops, rko);
}
void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id){
if (rkcg->rkcg_member_id && member_id &&
!rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id))
return; /* No change */
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID",
"Group \"%.*s\": updating member id \"%s\" -> \"%s\"",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rkcg->rkcg_member_id ?
rkcg->rkcg_member_id->str : "(not-set)",
member_id ? member_id : "(not-set)");
if (rkcg->rkcg_member_id) {
rd_kafkap_str_destroy(rkcg->rkcg_member_id);
rkcg->rkcg_member_id = NULL;
}
if (member_id)
rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1);
}
/**
* @brief Check if the latest metadata affects the current subscription:
* - matched topic added
* - matched topic removed
* - matched topic's partition count change
*
* @locks none
* @locality rdkafka main thread
*/
void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join) {
rd_list_t *tinfos;
rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
return;
/*
* Create a list of the topics in metadata that matches our subscription
*/
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
(void *)rd_kafka_topic_info_destroy);
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
rd_kafka_metadata_topic_match(rkcg->rkcg_rk,
tinfos, rkcg->rkcg_subscription);
else
rd_kafka_metadata_topic_filter(rkcg->rkcg_rk,
tinfos,
rkcg->rkcg_subscription);
/*
* Update (takes ownership of \c tinfos)
*/
if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && do_join) {
/* List of subscribed topics changed, trigger rejoin. */
rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_METADATA, "REJOIN",
"Group \"%.*s\": "
"subscription updated from metadata change: "
"rejoining group",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
rd_kafka_cgrp_rejoin(rkcg);
}
}
void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
const rd_kafkap_bytes_t *member_state) {
rd_kafka_buf_t *rkbuf = NULL;
rd_kafka_topic_partition_list_t *assignment;
const int log_decode_errors = LOG_ERR;
int16_t Version;
int32_t TopicCnt;
rd_kafkap_bytes_t UserData;
/* Dont handle new assignments when terminating */
if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
err = RD_KAFKA_RESP_ERR__DESTROY;
if (err)
goto err;
if (RD_KAFKAP_BYTES_LEN(member_state) == 0) {
/* Empty assignment. */
assignment = rd_kafka_topic_partition_list_new(0);
memset(&UserData, 0, sizeof(UserData));
goto done;
}
/* Parse assignment from MemberState */
rkbuf = rd_kafka_buf_new_shadow(member_state->data,
RD_KAFKAP_BYTES_LEN(member_state),
NULL);
/* Protocol parser needs a broker handle to log errors on. */
if (rkb) {
rkbuf->rkbuf_rkb = rkb;
rd_kafka_broker_keep(rkb);
} else
rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
rd_kafka_buf_read_i16(rkbuf, &Version);
rd_kafka_buf_read_i32(rkbuf, &TopicCnt);
if (TopicCnt > 10000) {
err = RD_KAFKA_RESP_ERR__BAD_MSG;
goto err;
}
assignment = rd_kafka_topic_partition_list_new(TopicCnt);
while (TopicCnt-- > 0) {
rd_kafkap_str_t Topic;
int32_t PartCnt;
rd_kafka_buf_read_str(rkbuf, &Topic);
rd_kafka_buf_read_i32(rkbuf, &PartCnt);
while (PartCnt-- > 0) {
int32_t Partition;
char *topic_name;
RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
rd_kafka_buf_read_i32(rkbuf, &Partition);
rd_kafka_topic_partition_list_add(
assignment, topic_name, Partition);
}
}
rd_kafka_buf_read_bytes(rkbuf, &UserData);
done:
/* Set the new assignment */
rd_kafka_cgrp_handle_assignment(rkcg, assignment);
rd_kafka_topic_partition_list_destroy(assignment);
if (rkbuf)
rd_kafka_buf_destroy(rkbuf);
return;
err_parse:
err = rkbuf->rkbuf_err;
err:
if (rkbuf)
rd_kafka_buf_destroy(rkbuf);
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC",
"Group \"%s\": synchronization failed: %s: rejoining",
rkcg->rkcg_group_id->str, rd_kafka_err2str(err));
rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
}